You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/06/05 03:08:21 UTC

[doris-flink-connector] branch master updated: [Feature]add mysql database sync (#141)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9905b  [Feature]add mysql database sync (#141)
9a9905b is described below

commit 9a9905bbaad25f982f9b08c370ba45118299aa57
Author: wudi <67...@qq.com>
AuthorDate: Mon Jun 5 11:08:15 2023 +0800

    [Feature]add mysql database sync (#141)
---
 flink-doris-connector/build.sh                     |   9 +-
 flink-doris-connector/pom.xml                      |  32 ++-
 .../apache/doris/flink/catalog/DorisCatalog.java   |   3 +
 .../doris/flink/catalog/doris/DataModel.java       |  23 +++
 .../doris/flink/catalog/doris/DorisSystem.java     | 230 +++++++++++++++++++++
 .../doris/flink/catalog/doris/DorisType.java       |  42 ++++
 .../doris/flink/catalog/doris/FieldSchema.java     |  56 +++++
 .../doris/flink/catalog/doris/TableSchema.java     |  97 +++++++++
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../flink/exception/CreateTableException.java      |  45 ++++
 .../doris/flink/sink/writer/DorisWriter.java       |  11 +-
 .../doris/flink/table/DorisConfigOptions.java      |  14 ++
 .../org/apache/doris/flink/tools/cdc/CdcTools.java | 101 +++++++++
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 217 +++++++++++++++++++
 .../flink/tools/cdc}/DateToStringConverter.java    |  10 +-
 .../apache/doris/flink/tools/cdc/SourceSchema.java | 104 ++++++++++
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   | 192 +++++++++++++++++
 .../doris/flink/tools/cdc/mysql/MysqlType.java     | 168 +++++++++++++++
 .../tools/cdc/mysql/ParsingProcessFunction.java    |  65 ++++++
 .../apache/doris/flink/CDCSchemaChangeExample.java |   5 +-
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  73 +++++++
 .../doris/flink/utils/DateToStringConverter.java   |   2 +-
 22 files changed, 1466 insertions(+), 35 deletions(-)

diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh
index 9dbe69d..5816e86 100755
--- a/flink-doris-connector/build.sh
+++ b/flink-doris-connector/build.sh
@@ -147,10 +147,15 @@ elif [ ${flinkVer} -eq 3 ]; then
     FLINK_VERSION="1.17.0"
 fi
 
-echo_g " flink version: ${FLINK_VERSION}"
+# extract minor version:
+# eg: 3.1.2 -> 3
+FLINK_MINOR_VERSION=0
+[ ${FLINK_VERSION} != 0 ] && FLINK_MINOR_VERSION=${FLINK_VERSION%.*}
+
+echo_g " flink version: ${FLINK_VERSION}, minor version: ${FLINK_MINOR_VERSION}"
 echo_g " build starting..."
 
-${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} "$@"
+${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.minor.version=${FLINK_MINOR_VERSION} "$@"
 
 EXIT_CODE=$?
 if [ $EXIT_CODE -eq 0 ]; then
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 0d0e6c4..025bcde 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -26,7 +26,7 @@ under the License.
         <version>23</version>
     </parent>
     <groupId>org.apache.doris</groupId>
-    <artifactId>flink-doris-connector</artifactId>
+    <artifactId>flink-doris-connector-${flink.minor.version}</artifactId>
     <version>1.4.0-SNAPSHOT</version>
     <name>Flink Doris Connector</name>
     <url>https://doris.apache.org/</url>
@@ -68,6 +68,7 @@ under the License.
 
     <properties>
         <flink.version>1.15.0</flink.version>
+        <flink.minor.version>1.15</flink.minor.version>
         <libthrift.version>0.16.0</libthrift.version>
         <arrow.version>5.0.0</arrow.version>
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
@@ -237,14 +238,20 @@ under the License.
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>1.7.25</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
-
+        <!-- use cdc bundled jar for kafka connect class-->
         <dependency>
             <groupId>com.ververica</groupId>
-            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
             <version>2.3.0</version>
-            <scope>test</scope>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 
@@ -393,19 +400,6 @@ under the License.
     </build>
 
     <profiles>
-
-        <profile>
-            <id>thirdparty</id>
-            <activation>
-                <property>
-                    <name>env.DORIS_THIRDPARTY</name>
-                </property>
-            </activation>
-            <properties>
-                <doris.thirdparty>${env.DORIS_THIRDPARTY}</doris.thirdparty>
-            </properties>
-        </profile>
-
         <!-- for custom internal repository -->
         <profile>
             <id>custom-env</id>
@@ -478,4 +472,4 @@ under the License.
         </profile>
     </profiles>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 3b5834f..ef735e5 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -81,6 +81,9 @@ import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
+/**
+ * catalog for flink
+ */
 public class DorisCatalog extends AbstractCatalog {
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
new file mode 100644
index 0000000..03b87a0
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DataModel.java
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public enum DataModel {
+    DUPLICATE,
+    UNIQUE,
+    AGGREGATE
+}
\ No newline at end of file
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
new file mode 100644
index 0000000..36aba1c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.connection.JdbcConnectionProvider;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.CreateTableException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Doris System Operate
+ */
+@Public
+public class DorisSystem {
+    private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
+    private JdbcConnectionProvider jdbcConnectionProvider;
+    private static final List<String> builtinDatabases = Arrays.asList("information_schema");
+
+    public DorisSystem(DorisConnectionOptions options) {
+        this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
+    }
+
+    public List<String> listDatabases() throws Exception {
+        return extractColumnValuesBySQL(
+                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+                1,
+                dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    public boolean databaseExists(String database) throws Exception {
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
+        return listDatabases().contains(database);
+    }
+
+    public boolean createDatabase(String database) throws Exception {
+        execute(String.format("CREATE DATABASE %s", database));
+        return true;
+    }
+
+    public boolean tableExists(String database, String table){
+        try {
+            return databaseExists(database)
+                    && listTables(database).contains(table);
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    public List<String> listTables(String databaseName) throws Exception {
+        if (!databaseExists(databaseName)) {
+            throw new DorisRuntimeException("database" + databaseName + " is not exists");
+        }
+        return extractColumnValuesBySQL(
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    public void createTable(TableSchema schema) throws Exception {
+        String ddl = buildCreateTableDDL(schema);
+        LOG.info("Create table with ddl:{}", ddl);
+        execute(ddl);
+    }
+
+    public void execute(String sql) throws Exception {
+        Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+        try (Statement statement = conn.createStatement()) {
+            statement.execute(sql);
+        }
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String sql,
+            int columnIndex,
+            Predicate<String> filterFunc,
+            Object... params) throws Exception {
+
+        Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+        List<String> columnValues = Lists.newArrayList();
+
+        try (PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "The following SQL query could not be executed: %s", sql),
+                    e);
+        }
+    }
+
+    public String buildCreateTableDDL(TableSchema schema) {
+        StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
+        sb.append(identifier(schema.getDatabase()))
+                .append(".")
+                .append(identifier(schema.getTable()))
+                .append("(");
+
+        Map<String, FieldSchema> fields = schema.getFields();
+        List<String> keys = schema.getKeys();
+        //append keys
+        for(String key : keys){
+            if(!fields.containsKey(key)){
+                throw new CreateTableException("key " + key + " not found in column list");
+            }
+            FieldSchema field = fields.get(key);
+            buildColumn(sb, field);
+        }
+
+        //append values
+        for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+            if(keys.contains(entry.getKey())){
+                continue;
+            }
+            FieldSchema field = entry.getValue();
+            buildColumn(sb, field);
+
+        }
+        sb = sb.deleteCharAt(sb.length() -1);
+        sb.append(" ) ");
+        //append model
+        sb.append(schema.getModel().name())
+                .append(" KEY(")
+                .append(String.join(",", identifier(schema.getKeys())))
+                .append(")");
+
+        //append table comment
+        if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
+            sb.append(" COMMENT '")
+                    .append(schema.getTableComment())
+                    .append("' ");
+        }
+
+        //append distribute key
+        sb.append(" DISTRIBUTED BY HASH(")
+                .append(String.join(",", identifier(schema.getDistributeKeys())))
+                .append(") BUCKETS AUTO ");
+
+        //append properties
+        int index = 0;
+        for (Map.Entry<String, String> entry : schema.getProperties().entrySet()) {
+            if (index == 0) {
+                sb.append(" PROPERTIES (");
+            }
+            if (index > 0) {
+                sb.append(",");
+            }
+            sb.append(quoteProperties(entry.getKey()))
+                    .append("=")
+                    .append(quoteProperties(entry.getValue()));
+            index++;
+
+            if (index == schema.getProperties().size()) {
+                sb.append(")");
+            }
+        }
+        return sb.toString();
+    }
+
+    private void buildColumn(StringBuilder sql, FieldSchema field){
+        sql.append(identifier(field.getName()))
+                .append(" ")
+                .append(field.getTypeString())
+                .append(" COMMENT '")
+                .append(field.getComment())
+                .append("',");
+    }
+
+    private List<String> identifier(List<String> name) {
+        List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
+        return result;
+    }
+
+    private String identifier(String name) {
+        return "`" + name + "`";
+    }
+
+    private String quoteProperties(String name) {
+        return "'" + name + "'";
+    }
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
new file mode 100644
index 0000000..1e43ac1
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public class DorisType {
+    public static final String BOOLEAN = "BOOLEAN";
+    public static final String TINYINT = "TINYINT";
+    public static final String SMALLINT = "SMALLINT";
+    public static final String INT = "INT";
+    public static final String BIGINT = "BIGINT";
+    public static final String LARGEINT = "LARGEINT";
+    public static final String FLOAT = "FLOAT";
+    public static final String DOUBLE = "DOUBLE";
+    public static final String DECIMAL = "DECIMAL";
+    public static final String DECIMAL_V3 = "DECIMALV3";
+    public static final String DATE = "DATE";
+    public static final String DATE_V2 = "DATEV2";
+    public static final String DATETIME = "DATETIME";
+    public static final String DATETIME_V2 = "DATETIMEV2";
+    public static final String CHAR = "CHAR";
+    public static final String VARCHAR = "VARCHAR";
+    public static final String STRING = "STRING";
+    public static final String HLL = "HLL";
+    public static final String BITMAP = "BITMAP";
+    public static final String ARRAY = "ARRAY";
+    public static final String JSONB = "JSONB";
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
new file mode 100644
index 0000000..8255bd3
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+public class FieldSchema {
+    private String name;
+    private String typeString;
+    private String comment;
+
+    public FieldSchema() {
+    }
+
+    public FieldSchema(String name, String typeString, String comment) {
+        this.name = name;
+        this.typeString = typeString;
+        this.comment = comment;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getTypeString() {
+        return typeString;
+    }
+
+    public void setTypeString(String typeString) {
+        this.typeString = typeString;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public void setComment(String comment) {
+        this.comment = comment;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
new file mode 100644
index 0000000..8f04705
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog.doris;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TableSchema {
+    private String database;
+    private String table;
+    private String tableComment;
+    private Map<String, FieldSchema> fields;
+    private List<String> keys = new ArrayList<>();
+    private DataModel model = DataModel.DUPLICATE;
+    private List<String> distributeKeys = new ArrayList<>();
+    private Map<String, String> properties = new HashMap<>();
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public String getTableComment() {
+        return tableComment;
+    }
+
+    public Map<String, FieldSchema> getFields() {
+        return fields;
+    }
+
+    public List<String> getKeys() {
+        return keys;
+    }
+
+    public DataModel getModel() {
+        return model;
+    }
+
+    public List<String> getDistributeKeys() {
+        return distributeKeys;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public void setTableComment(String tableComment) {
+        this.tableComment = tableComment;
+    }
+
+    public void setFields(Map<String, FieldSchema> fields) {
+        this.fields = fields;
+    }
+
+    public void setKeys(List<String> keys) {
+        this.keys = keys;
+    }
+
+    public void setModel(DataModel model) {
+        this.model = model;
+    }
+
+    public void setDistributeKeys(List<String> distributeKeys) {
+        this.distributeKeys = distributeKeys;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 36c577a..722f6ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -122,7 +122,7 @@ public class DorisExecutionOptions implements Serializable {
         private int bufferCount = DEFAULT_BUFFER_COUNT;
         private String labelPrefix = "";
         private Properties streamLoadProp = new Properties();
-        private boolean enableDelete = false;
+        private boolean enableDelete = true;
 
         private boolean enable2PC = true;
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java
new file mode 100644
index 0000000..929346c
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CreateTableException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+/**
+ * Create Table exception.
+ */
+public class CreateTableException extends RuntimeException {
+    public CreateTableException() {
+        super();
+    }
+
+    public CreateTableException(String message) {
+        super(message);
+    }
+
+    public CreateTableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CreateTableException(Throwable cause) {
+        super(cause);
+    }
+
+    protected CreateTableException(String message, Throwable cause,
+                                   boolean enableSuppression,
+                                   boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 711f765..642e1d3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -130,15 +130,16 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
+        byte[] serialize = serializer.serialize(in);
+        if(Objects.isNull(serialize)){
+            //ddl record
+            return;
+        }
         if(!loading) {
             //Start streamload only when there has data
             dorisStreamLoad.startLoad(currentLabel);
             loading = true;
         }
-        byte[] serialize = serializer.serialize(in);
-        if(Objects.isNull(serialize)){
-            return;
-        }
         dorisStreamLoad.writeRecord(serialize);
     }
 
@@ -254,7 +255,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
             backend = "http://" + backend;
             URL url = new URL(backend);
             HttpURLConnection co =  (HttpURLConnection) url.openConnection();
-            co.setConnectTimeout(1000);
+            co.setConnectTimeout(60000);
             co.connect();
             co.disconnect();
             return true;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index a637967..3e129aa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
@@ -196,4 +198,16 @@ public class DorisConfigOptions {
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
+    public static Properties getStreamLoadProp(Map<String, String> tableOptions) {
+        final Properties streamLoadProp = new Properties();
+
+        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+            if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
+                String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
+                streamLoadProp.put(subKey, entry.getValue());
+            }
+        }
+        return streamLoadProp;
+    }
+
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
new file mode 100644
index 0000000..809c4ea
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * cdc sync tools
+ */
+public class CdcTools {
+    private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
+    private static final List<String> EMPTY_KEYS = Arrays.asList("password");
+
+    public static void main(String[] args) throws Exception {
+        String operation = args[0].toLowerCase();
+        String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
+        System.out.println();
+        switch (operation) {
+            case MYSQL_SYNC_DATABASE:
+                createMySQLSyncDatabase(opArgs);
+                break;
+            default:
+                System.out.println("Unknown operation " + operation);
+                System.exit(1);
+        }
+    }
+
+    private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        String jobName = params.get("job-name");
+        String database = params.get("database");
+        String tablePrefix = params.get("table-prefix");
+        String tableSuffix = params.get("table-suffix");
+        String includingTables = params.get("including-tables");
+        String excludingTables = params.get("excluding-tables");
+
+        Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
+        Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
+        Map<String, String> tableMap = getConfigMap(params, "table-conf");
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
+        Configuration sinkConfig = Configuration.fromMap(sinkMap);
+
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap);
+        databaseSync.build();
+
+        if(StringUtils.isNullOrWhitespaceOnly(jobName)){
+            jobName = String.format("MySQL-Doris Sync Database: %s", mysqlMap.get("database-name"));
+        }
+        env.execute(jobName);
+    }
+
+    private static Map<String, String> getConfigMap(MultipleParameterTool params, String key) {
+        if (!params.has(key)) {
+            return null;
+        }
+
+        Map<String, String> map = new HashMap<>();
+        for (String param : params.getMultiParameter(key)) {
+            String[] kv = param.split("=");
+            if (kv.length == 2) {
+                map.put(kv[0], kv[1]);
+                continue;
+            }else if(kv.length == 1 && EMPTY_KEYS.contains(kv[0])){
+                map.put(kv[0], "");
+                continue;
+            }
+
+            System.err.println(
+                    "Invalid " + key + " " + param + ".\n");
+            return null;
+        }
+        return map;
+    }
+}
\ No newline at end of file
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
new file mode 100644
index 0000000..84d5b57
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.table.DorisConfigOptions;
+import org.apache.doris.flink.tools.cdc.mysql.ParsingProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public abstract class DatabaseSync {
+    private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
+    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+    protected Configuration config;
+    protected String database;
+    protected TableNameConverter converter;
+    protected Pattern includingPattern;
+    protected Pattern excludingPattern;
+    protected Map<String, String> tableConfig;
+    protected Configuration sinkConfig;
+    public StreamExecutionEnvironment env;
+
+    public abstract Connection getConnection() throws SQLException;
+
+    public abstract List<SourceSchema> getSchemaList() throws Exception;
+
+    public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env);
+
+
+    public void create(StreamExecutionEnvironment env, String database, Configuration config,
+                       String tablePrefix, String tableSuffix, String includingTables,
+                       String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig) {
+        this.env = env;
+        this.config = config;
+        this.database = database;
+        this.converter = new TableNameConverter(tablePrefix, tableSuffix);
+        this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
+        this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
+        this.sinkConfig = sinkConfig;
+        this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
+        //default enable light schema change
+        if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
+            this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
+        }
+    }
+
+    public void build() throws Exception {
+        DorisConnectionOptions options = getDorisConnectionOptions();
+        DorisSystem dorisSystem = new DorisSystem(options);
+
+        List<SourceSchema> schemaList = getSchemaList();
+        if (!dorisSystem.databaseExists(database)) {
+            LOG.info("database {} not exist, created", database);
+            dorisSystem.createDatabase(database);
+        }
+
+        List<String> syncTables = new ArrayList<>();
+        List<String> dorisTables = new ArrayList<>();
+        for (SourceSchema schema : schemaList) {
+            syncTables.add(schema.getTableName());
+            String dorisTable = converter.convert(schema.getTableName());
+            if (!dorisSystem.tableExists(database, dorisTable)) {
+                TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
+                //set doris target database
+                dorisSchema.setDatabase(database);
+                dorisSchema.setTable(dorisTable);
+                dorisSystem.createTable(dorisSchema);
+            }
+            dorisTables.add(dorisTable);
+        }
+        Preconditions.checkState(!syncTables.isEmpty(), "No tables to be synchronized.");
+        config.set(MySqlSourceOptions.TABLE_NAME, "(" + String.join("|", syncTables) + ")");
+
+        DataStreamSource<String> streamSource = buildCdcSource(env);
+        SingleOutputStreamOperator<Void> parsedStream = streamSource.process(new ParsingProcessFunction(converter));
+        for (String table : dorisTables) {
+            OutputTag<String> recordOutputTag = ParsingProcessFunction.createRecordOutputTag(table);
+            DataStream<String> sideOutput = parsedStream.getSideOutput(recordOutputTag);
+
+            int sinkParallel = sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+            sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table);
+        }
+    }
+
+    private DorisConnectionOptions getDorisConnectionOptions() {
+        String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+        String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
+        String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+        String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
+        Preconditions.checkNotNull(fenodes, "fenodes is empty in sink-conf");
+        Preconditions.checkNotNull(user, "username is empty in sink-conf");
+        Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
+        DorisConnectionOptions.DorisConnectionOptionsBuilder builder = new DorisConnectionOptions.DorisConnectionOptionsBuilder()
+                .withFenodes(fenodes)
+                .withUsername(user)
+                .withPassword(passwd)
+                .withJdbcUrl(jdbcUrl);
+        return builder.build();
+    }
+
+    /**
+     * create doris sink
+     */
+    public DorisSink<String> buildDorisSink(String table) {
+        String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+        String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
+        String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+        String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);
+
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes(fenodes)
+                .setTableIdentifier(database + "." + table)
+                .setUsername(user)
+                .setPassword(passwd);
+
+        Properties pro = new Properties();
+        //default json data format
+        pro.setProperty("format", "json");
+        pro.setProperty("read_json_by_line", "true");
+        //customer stream load properties
+        Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
+        pro.putAll(streamLoadProp);
+        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder()
+                .setLabelPrefix(String.join("-", labelPrefix, database, table))
+                .setStreamLoadProp(pro);
+
+        sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE).ifPresent(executionBuilder::setDeletable);
+        sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT).ifPresent(executionBuilder::setBufferCount);
+        sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE).ifPresent(executionBuilder::setBufferSize);
+        sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL).ifPresent(executionBuilder::setCheckInterval);
+        sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries);
+
+        boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC);
+        if(!enable2pc){
+            executionBuilder.disable2PC();
+        }
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
+                .setDorisOptions(dorisBuilder.build());
+        return builder.build();
+    }
+
+    /**
+     * Filter table that need to be synchronized
+     */
+    protected boolean isSyncNeeded(String tableName) {
+        boolean sync = true;
+        if (includingPattern != null) {
+            sync = includingPattern.matcher(tableName).matches();
+        }
+        if (excludingPattern != null) {
+            sync = sync && !excludingPattern.matcher(tableName).matches();
+        }
+        LOG.debug("table {} is synchronized? {}", tableName, sync);
+        return sync;
+    }
+
+    public static class TableNameConverter implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final String prefix;
+        private final String suffix;
+
+        TableNameConverter(){
+            this("","");
+        }
+
+        TableNameConverter(String prefix, String suffix) {
+            this.prefix = prefix == null ? "" : prefix;
+            this.suffix = suffix == null ? "" : suffix;
+        }
+
+        public String convert(String tableName) {
+            return prefix + tableName + suffix;
+        }
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
similarity index 96%
copy from flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
copy to flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
index 9d73f53..ed5b2b6 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DateToStringConverter.java
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.utils;
+package org.apache.doris.flink.tools.cdc;
 
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
 import io.debezium.spi.converter.CustomConverter;
 import io.debezium.spi.converter.RelationalColumn;
-import org.apache.kafka.connect.data.SchemaBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,11 +47,11 @@ public class DateToStringConverter implements CustomConverter<SchemaBuilder, Rel
 
     static {
         DEFAULT_PROPS.setProperty("converters", "date");
-        DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.utils.DateToStringConverter");
+        DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.tools.cdc.DateToStringConverter");
         DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
         DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss");
-        DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss");
-        DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC");
+        DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS");
+        DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC+8");
     }
 
     @Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
new file mode 100644
index 0000000..03aa90e
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SourceSchema {
+    private final String databaseName;
+    private final String tableName;
+    private final String tableComment;
+    private final LinkedHashMap<String, FieldSchema> fields;
+    public final List<String> primaryKeys;
+
+    public SourceSchema(
+            DatabaseMetaData metaData, String databaseName, String tableName, String tableComment)
+            throws Exception {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.tableComment = tableComment;
+
+        fields = new LinkedHashMap<>();
+        try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                String comment = rs.getString("REMARKS");
+                String fieldType = rs.getString("TYPE_NAME");
+                Integer precision = rs.getInt("COLUMN_SIZE");
+
+                if (rs.wasNull()) {
+                    precision = null;
+                }
+                Integer scale = rs.getInt("DECIMAL_DIGITS");
+                if (rs.wasNull()) {
+                    scale = null;
+                }
+                String dorisTypeStr = MysqlType.toDorisType(fieldType, precision, scale);
+                fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
+            }
+        }
+
+        primaryKeys = new ArrayList<>();
+        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                primaryKeys.add(fieldName);
+            }
+        }
+    }
+
+    public TableSchema convertTableSchema(Map<String, String> tableProps) {
+        TableSchema tableSchema = new TableSchema();
+        tableSchema.setModel(DataModel.UNIQUE);
+        tableSchema.setFields(this.fields);
+        tableSchema.setKeys(this.primaryKeys);
+        tableSchema.setTableComment(this.tableComment);
+        tableSchema.setDistributeKeys(this.primaryKeys);
+        tableSchema.setProperties(tableProps);
+        return tableSchema;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public LinkedHashMap<String, FieldSchema> getFields() {
+        return fields;
+    }
+
+    public List<String> getPrimaryKeys() {
+        return primaryKeys;
+    }
+
+    public String getTableComment() {
+        return tableComment;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
new file mode 100644
index 0000000..caa975d
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.DateToStringConverter;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class MysqlDatabaseSync extends DatabaseSync {
+    private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class);
+
+    public MysqlDatabaseSync() {
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return DriverManager.getConnection(
+                String.format(
+                        "jdbc:mysql://%s:%d?useInformationSchema=true",
+                        config.get(MySqlSourceOptions.HOSTNAME),
+                        config.get(MySqlSourceOptions.PORT)),
+                config.get(MySqlSourceOptions.USERNAME),
+                config.get(MySqlSourceOptions.PASSWORD));
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        try (Connection conn = getConnection()) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                         metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    String tableComment = tables.getString("REMARKS");
+                    if (!isSyncNeeded(tableName)) {
+                        continue;
+                    }
+                    SourceSchema sourceSchema =
+                            new SourceSchema(metaData, databaseName, tableName, tableComment);
+                    if (sourceSchema.primaryKeys.size() > 0) {
+                        //Only sync tables with primary keys
+                        schemaList.add(sourceSchema);
+                    } else {
+                        LOG.warn("table {} has no primary key, skip", tableName);
+                        System.out.println("table " + tableName + " has no primary key, skip.");
+                    }
+                }
+            }
+        }
+        return schemaList;
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+        MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+
+        String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+        Preconditions.checkNotNull(databaseName, "database-name in mysql is required");
+        String tableName = config.get(MySqlSourceOptions.TABLE_NAME);
+        sourceBuilder
+                .hostname(config.get(MySqlSourceOptions.HOSTNAME))
+                .port(config.get(MySqlSourceOptions.PORT))
+                .username(config.get(MySqlSourceOptions.USERNAME))
+                .password(config.get(MySqlSourceOptions.PASSWORD))
+                .databaseList(databaseName)
+                .tableList(databaseName + "." + tableName);
+
+        config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
+        config
+                .getOptional(MySqlSourceOptions.SERVER_TIME_ZONE)
+                .ifPresent(sourceBuilder::serverTimeZone);
+        config
+                .getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)
+                .ifPresent(sourceBuilder::fetchSize);
+        config
+                .getOptional(MySqlSourceOptions.CONNECT_TIMEOUT)
+                .ifPresent(sourceBuilder::connectTimeout);
+        config
+                .getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES)
+                .ifPresent(sourceBuilder::connectMaxRetries);
+        config
+                .getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE)
+                .ifPresent(sourceBuilder::connectionPoolSize);
+        config
+                .getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL)
+                .ifPresent(sourceBuilder::heartbeatInterval);
+        config
+                .getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED)
+                .ifPresent(sourceBuilder::scanNewlyAddedTableEnabled);
+
+        String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.initial());
+        } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.earliest());
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(StartupOptions.latest());
+        } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
+            BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+            String file = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
+            Long pos = config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
+            if (file != null && pos != null) {
+                offsetBuilder.setBinlogFilePosition(file, pos);
+            }
+            config
+                    .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET)
+                    .ifPresent(offsetBuilder::setGtidSet);
+            config
+                    .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS)
+                    .ifPresent(offsetBuilder::setSkipEvents);
+            config
+                    .getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS)
+                    .ifPresent(offsetBuilder::setSkipRows);
+            sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
+        } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+            sourceBuilder.startupOptions(
+                    StartupOptions.timestamp(
+                            config.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+        }
+
+        Properties jdbcProperties = new Properties();
+        Properties debeziumProperties = new Properties();
+        //date to string
+        debeziumProperties.putAll(DateToStringConverter.DEFAULT_PROPS);
+
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+                jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()), value);
+            } else if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+        sourceBuilder.jdbcProperties(jdbcProperties);
+        sourceBuilder.debeziumProperties(debeziumProperties);
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+        MySqlSource<String> mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+
+        DataStreamSource<String> streamSource = env.fromSource(
+                mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+        return streamSource;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
new file mode 100644
index 0000000..92325ac
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.flink.util.Preconditions;
+
+public class MysqlType {
+    private static final String BIT = "BIT";
+    private static final String BOOLEAN = "BOOLEAN";
+    private static final String BOOL = "BOOL";
+    private static final String TINYINT = "TINYINT";
+    private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
+    private static final String SMALLINT = "SMALLINT";
+    private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL";
+    private static final String MEDIUMINT = "MEDIUMINT";
+    private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL";
+    private static final String INT = "INT";
+    private static final String INT_UNSIGNED = "INT UNSIGNED";
+    private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
+    private static final String BIGINT = "BIGINT";
+    private static final String SERIAL = "SERIAL";
+    private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL";
+    private static final String REAL = "REAL";
+    private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+    private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL";
+    private static final String FLOAT = "FLOAT";
+    private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL";
+    private static final String DOUBLE = "DOUBLE";
+    private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL";
+    private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED";
+    private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+            "DOUBLE PRECISION UNSIGNED ZEROFILL";
+    private static final String NUMERIC = "NUMERIC";
+    private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+    private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL";
+    private static final String FIXED = "FIXED";
+    private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+    private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL";
+    private static final String DECIMAL = "DECIMAL";
+    private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL";
+    private static final String CHAR = "CHAR";
+    private static final String VARCHAR = "VARCHAR";
+    private static final String TINYTEXT = "TINYTEXT";
+    private static final String MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String TEXT = "TEXT";
+    private static final String LONGTEXT = "LONGTEXT";
+    private static final String DATE = "DATE";
+    private static final String TIME = "TIME";
+    private static final String DATETIME = "DATETIME";
+    private static final String TIMESTAMP = "TIMESTAMP";
+    private static final String YEAR = "YEAR";
+    private static final String BINARY = "BINARY";
+    private static final String VARBINARY = "VARBINARY";
+    private static final String TINYBLOB = "TINYBLOB";
+    private static final String MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String BLOB = "BLOB";
+    private static final String LONGBLOB = "LONGBLOB";
+    private static final String JSON = "JSON";
+    private static final String ENUM = "ENUM";
+
+    public static String toDorisType(String type, Integer length, Integer scale) {
+        switch (type.toUpperCase()) {
+            case BIT:
+            case BOOLEAN:
+            case BOOL:
+                return DorisType.BOOLEAN;
+            case TINYINT:
+                return DorisType.TINYINT;
+            case TINYINT_UNSIGNED:
+            case TINYINT_UNSIGNED_ZEROFILL:
+            case SMALLINT:
+                return DorisType.SMALLINT;
+            case SMALLINT_UNSIGNED:
+            case SMALLINT_UNSIGNED_ZEROFILL:
+            case INT:
+            case MEDIUMINT:
+            case YEAR:
+                return DorisType.INT;
+            case INT_UNSIGNED:
+            case INT_UNSIGNED_ZEROFILL:
+            case MEDIUMINT_UNSIGNED:
+            case MEDIUMINT_UNSIGNED_ZEROFILL:
+            case BIGINT:
+                return DorisType.BIGINT;
+            case BIGINT_UNSIGNED:
+            case BIGINT_UNSIGNED_ZEROFILL:
+                return DorisType.LARGEINT;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+            case FLOAT_UNSIGNED_ZEROFILL:
+                return DorisType.FLOAT;
+            case REAL:
+            case REAL_UNSIGNED:
+            case REAL_UNSIGNED_ZEROFILL:
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+            case DOUBLE_UNSIGNED_ZEROFILL:
+            case DOUBLE_PRECISION:
+            case DOUBLE_PRECISION_UNSIGNED:
+            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+                return DorisType.DOUBLE;
+            case NUMERIC:
+            case NUMERIC_UNSIGNED:
+            case NUMERIC_UNSIGNED_ZEROFILL:
+            case FIXED:
+            case FIXED_UNSIGNED:
+            case FIXED_UNSIGNED_ZEROFILL:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+            case DECIMAL_UNSIGNED_ZEROFILL:
+                return length != null && length <= 38
+                        ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, length, scale != null && scale >= 0 ? scale : 0)
+                        : DorisType.STRING;
+            case DATE:
+                return DorisType.DATE_V2;
+            case DATETIME:
+            case TIMESTAMP:
+                return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length == null ? 0 : length, 6));
+            case CHAR:
+                Preconditions.checkNotNull(length);
+                return String.format("%s(%s)", DorisType.CHAR, length);
+            case VARCHAR:
+                Preconditions.checkNotNull(length);
+                return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3);
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case ENUM:
+            case TIME:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case BINARY:
+            case VARBINARY:
+                return DorisType.STRING;
+            case JSON:
+                return DorisType.JSONB;
+            default:
+                throw new UnsupportedOperationException("Unsupported MySQL Type: " + type);
+        }
+
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
new file mode 100644
index 0000000..563c848
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/ParsingProcessFunction.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.mysql;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParsingProcessFunction extends ProcessFunction<String, Void> {
+    private ObjectMapper objectMapper = new ObjectMapper();
+    private transient Map<String, OutputTag<String>> recordOutputTags;
+    private DatabaseSync.TableNameConverter converter;
+
+    public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
+        this.converter = converter;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        recordOutputTags = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(String record, ProcessFunction<String, Void>.Context context, Collector<Void> collector) throws Exception {
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        String tableName = extractJsonNode(recordRoot.get("source"), "table");
+        String dorisName = converter.convert(tableName);
+        context.output(getRecordOutputTag(dorisName), record);
+    }
+
+    private String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null ? record.get(key).asText() : null;
+    }
+
+    private OutputTag<String> getRecordOutputTag(String tableName) {
+        return recordOutputTags.computeIfAbsent(
+                tableName, ParsingProcessFunction::createRecordOutputTag);
+    }
+
+    public static OutputTag<String> createRecordOutputTag(String tableName) {
+        return new OutputTag<String>("record-" + tableName) {
+        };
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index bdc0584..3bad9be 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -18,16 +18,17 @@
 package org.apache.doris.flink;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.utils.DateToStringConverter;
 import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.utils.DateToStringConverter;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.kafka.connect.json.JsonConverterConfig;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
new file mode 100644
index 0000000..cbd3b38
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcMysqlSyncDatabaseCase {
+
+    public static void main(String[] args) throws Exception{
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+//        env.setParallelism(1);
+
+        Map<String,String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval","10s");
+        flinkMap.put("pipeline.operator-chaining","false");
+        flinkMap.put("parallelism.default","1");
+
+
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+
+        String database = "db1";
+        String tablePrefix = "";
+        String tableSuffix = "";
+        Map<String,String> mysqlConfig = new HashMap<>();
+        mysqlConfig.put("database-name","db1");
+        mysqlConfig.put("hostname","127.0.0.1");
+        mysqlConfig.put("port","3306");
+        mysqlConfig.put("username","root");
+        mysqlConfig.put("password","");
+        Configuration config = Configuration.fromMap(mysqlConfig);
+
+        Map<String,String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes","127.0.0.1:8030");
+        sinkConfig.put("username","root");
+        sinkConfig.put("password","");
+        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String,String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "tbl1|tbl2|tbl3";
+        String excludingTables = "";
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig);
+        databaseSync.build();
+        env.execute(String.format("MySQL-Doris Database Sync: %s", database));
+
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
index 9d73f53..1f1fe61 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
@@ -17,9 +17,9 @@
 
 package org.apache.doris.flink.utils;
 
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
 import io.debezium.spi.converter.CustomConverter;
 import io.debezium.spi.converter.RelationalColumn;
-import org.apache.kafka.connect.data.SchemaBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org