You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/08 13:39:49 UTC

[incubator-doris] branch master updated: [New Feature] Support synchronizing MySQL binlog in real time [stage 1] (#6289)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c571a  [New Feature] Support synchronizing MySQL binlog in real time [stage 1] (#6289)
c8c571a is described below

commit c8c571af37193ee10b5437fd9b47b30c4b917d60
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Sun Aug 8 21:39:34 2021 +0800

    [New Feature] Support synchronizing MySQL binlog in real time [stage 1] (#6289)
    
    This commit is the first stage of #6287
    
    In this commit, we support:
    1、Sync Job
    1)、 Creating sync job and data channel in Fe.
    2)、Pause sync job.
    3)、Resume sync job.
    4)、Stop sync job.
    5)、Show sync jobs.
    
    2、Canal
    1)、Subscribing and getting the binlog data of canal with creating syncjob.
---
 fe/fe-core/pom.xml                                 |  35 ++
 fe/fe-core/src/main/cup/sql_parser.cup             |  87 +++-
 .../java/org/apache/doris/analysis/BinlogDesc.java |  67 +++
 .../apache/doris/analysis/ChannelDescription.java  | 148 +++++++
 .../doris/analysis/CreateDataSyncJobStmt.java      | 145 +++++++
 .../java/org/apache/doris/analysis/JobName.java    |  59 +++
 .../apache/doris/analysis/PauseSyncJobStmt.java    |  56 +++
 .../apache/doris/analysis/ResumeSyncJobStmt.java   |  56 +++
 .../org/apache/doris/analysis/ShowSyncJobStmt.java |  97 +++++
 .../org/apache/doris/analysis/StopSyncJobStmt.java |  56 +++
 .../java/org/apache/doris/catalog/Catalog.java     |  27 +-
 .../main/java/org/apache/doris/common/Config.java  |  17 +
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../java/org/apache/doris/common/MetaReader.java   |   1 +
 .../java/org/apache/doris/common/MetaWriter.java   |   2 +
 .../org/apache/doris/journal/JournalEntity.java    |  16 +-
 .../apache/doris/load/sync/DataSyncJobType.java    |  32 ++
 .../org/apache/doris/load/sync/SyncChannel.java    | 122 ++++++
 .../doris/load/sync/SyncChannelCallback.java       |  28 ++
 .../apache/doris/load/sync/SyncChannelHandle.java  | 111 +++++
 .../org/apache/doris/load/sync/SyncChecker.java    |  95 +++++
 .../apache/doris/load/sync/SyncDataConsumer.java   |  50 +++
 .../apache/doris/load/sync/SyncDataReceiver.java   |  36 ++
 .../org/apache/doris/load/sync/SyncFailMsg.java    |  93 +++++
 .../java/org/apache/doris/load/sync/SyncJob.java   | 347 +++++++++++++++
 .../org/apache/doris/load/sync/SyncJobManager.java | 325 ++++++++++++++
 .../org/apache/doris/load/sync/SyncLifeCycle.java  |  72 ++++
 .../apache/doris/load/sync/SyncPendingTask.java    |  53 +++
 .../org/apache/doris/load/sync/SyncTxnParam.java   |  65 +++
 .../apache/doris/load/sync/canal/CanalConfigs.java |  30 ++
 .../doris/load/sync/canal/CanalSyncChannel.java    | 356 ++++++++++++++++
 .../load/sync/canal/CanalSyncDataConsumer.java     | 458 ++++++++++++++++++++
 .../load/sync/canal/CanalSyncDataReceiver.java     | 116 +++++
 .../apache/doris/load/sync/canal/CanalSyncJob.java | 310 ++++++++++++++
 .../apache/doris/load/sync/canal/CanalUtils.java   | 237 +++++++++++
 .../doris/load/sync/canal/SyncCanalClient.java     | 147 +++++++
 .../org/apache/doris/load/sync/model/Data.java     |  48 +++
 .../org/apache/doris/load/sync/model/Events.java   |  73 ++++
 .../doris/load/sync/position/EntryPosition.java    | 176 ++++++++
 .../doris/load/sync/position/PositionMeta.java     |  97 +++++
 .../doris/load/sync/position/PositionRange.java    |  88 ++++
 .../java/org/apache/doris/persist/EditLog.java     |  23 +-
 .../org/apache/doris/persist/OperationType.java    |   4 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |  31 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |  24 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  30 +-
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   2 +
 .../doris/analysis/CreateDataSyncJobStmtTest.java  | 183 ++++++++
 .../apache/doris/load/sync/SyncJobManagerTest.java | 369 ++++++++++++++++
 .../org/apache/doris/load/sync/SyncJobTest.java    |  91 ++++
 .../doris/load/sync/canal/CanalSyncDataTest.java   | 465 +++++++++++++++++++++
 .../doris/load/sync/canal/CanalSyncJobTest.java    | 416 ++++++++++++++++++
 .../doris/load/sync/canal/CanalTestUtil.java       |  94 +++++
 53 files changed, 6144 insertions(+), 26 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 329c73a..f84abbc 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -266,6 +266,7 @@ under the License.
             <artifactId>log4j-slf4j-impl</artifactId>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
@@ -470,6 +471,40 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>1.1.4</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>1.1.4</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.hibernate</groupId>
             <artifactId>hibernate-validator</artifactId>
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 0b21b71..d6c4333 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -234,7 +234,7 @@ parser code {:
 
 // Total keywords of doris
 terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_ARRAY,
-    KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN,
+    KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN,
     KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS,
     KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED,
     KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER,
@@ -247,7 +247,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
     KW_HASH, KW_HAVING, KW_HDFS, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HOUR, KW_HUB,
     KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, KW_INSTALL,
     KW_INNER, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_IS, KW_ISNULL, KW_ISOLATION,
-    KW_JOIN,
+    KW_JOB, KW_JOIN,
     KW_KEY, KW_KEYS, KW_KILL,
     KW_LABEL, KW_LARGEINT, KW_LAST, KW_LEFT, KW_LESS, KW_LEVEL, KW_LIKE, KW_LIMIT, KW_LINK, KW_LIST, KW_LOAD,
     KW_LOCAL, KW_LOCATION,
@@ -485,6 +485,15 @@ nonterminal String keyword, ident, ident_or_text, variable_name, text_or_passwor
         collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, time_unit,
         literal_or_ident;
 
+// sync job
+nonterminal List<ChannelDescription> channel_desc_list;
+nonterminal ChannelDescription channel_desc;
+nonterminal BinlogDesc binlog_desc;
+nonterminal ResumeSyncJobStmt resume_sync_job_stmt;
+nonterminal PauseSyncJobStmt pause_sync_job_stmt;
+nonterminal StopSyncJobStmt stop_sync_job_stmt;
+nonterminal JobName job_name;
+
 nonterminal String opt_db, procedure_or_function, opt_comment, opt_engine;
 nonterminal ColumnDef.DefaultValue opt_default_value;
 nonterminal Boolean opt_if_exists, opt_if_not_exists;
@@ -672,6 +681,12 @@ stmt ::=
     {: RESULT = stmt; :}
     | resume_routine_load_stmt : stmt
     {: RESULT = stmt; :}
+    | pause_sync_job_stmt : stmt
+    {: RESULT = stmt; :}
+    | resume_sync_job_stmt : stmt
+    {: RESULT = stmt; :}
+    | stop_sync_job_stmt : stmt
+    {: RESULT = stmt; :}
     | stop_routine_load_stmt : stmt
     {: RESULT = stmt; :}
     | show_routine_load_stmt : stmt
@@ -1226,6 +1241,69 @@ create_stmt ::=
     {:
         RESULT = new CreateEncryptKeyStmt(keyName, keyString);
     :}
+    /* sync job */
+    | KW_CREATE KW_SYNC ident:db DOT ident_or_text:jobName LPAREN channel_desc_list:channelDescList RPAREN binlog_desc:binlog opt_properties:properties
+    {:
+        RESULT = new CreateDataSyncJobStmt(jobName, db, channelDescList, binlog, properties);
+    :}
+    ;
+
+channel_desc_list ::=
+    channel_desc:desc
+    {:
+        RESULT = Lists.newArrayList(desc);
+    :}
+    | channel_desc_list:list COMMA channel_desc:desc
+    {:
+        list.add(desc);
+        RESULT = list;
+    :}
+    ;
+
+channel_desc ::=
+    KW_FROM ident:srcDatabase DOT ident:srcTableName KW_INTO ident:desTableName opt_partition_names:partitionNames opt_col_list:colList
+    {:
+        RESULT = new ChannelDescription(srcDatabase, srcTableName, desTableName, partitionNames, colList);
+    :}
+    ;
+
+binlog_desc ::=
+    KW_FROM KW_BINLOG LPAREN key_value_map:properties RPAREN
+    {:
+        RESULT = new BinlogDesc(properties);
+    :}
+;
+
+resume_sync_job_stmt ::=
+    KW_RESUME KW_SYNC KW_JOB job_name:jobName
+    {:
+        RESULT = new ResumeSyncJobStmt(jobName);
+    :}
+    ;
+
+pause_sync_job_stmt ::=
+    KW_PAUSE KW_SYNC KW_JOB job_name:jobName
+    {:
+        RESULT = new PauseSyncJobStmt(jobName);
+    :}
+    ;
+
+stop_sync_job_stmt ::=
+    KW_STOP KW_SYNC KW_JOB job_name:jobName
+    {:
+        RESULT = new StopSyncJobStmt(jobName);
+    :}
+    ;
+
+job_name ::=
+    ident:jobName
+    {:
+        RESULT = new JobName("", jobName);
+    :}
+    | ident:db DOT ident:jobName
+    {:
+        RESULT = new JobName(db, jobName);
+    :}
     ;
 
 opt_aggregate ::=
@@ -2640,6 +2718,11 @@ show_param ::=
     {:
         RESULT = new ShowEncryptKeysStmt(dbName, parser.wild);
     :}
+    /* Show Sync Job */
+    | KW_SYNC KW_JOB opt_db:dbName
+    {:
+        RESULT = new ShowSyncJobStmt(dbName);
+    :}
     ;
 
 opt_tmp ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BinlogDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinlogDesc.java
new file mode 100644
index 0000000..495092d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BinlogDesc.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.sync.DataSyncJobType;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+// Binlog descriptor
+//
+// Example:
+// FROM BINLOG
+// (
+//   "type" = "canal",
+//   "canal.server.ip" = "127.0.0.1",
+//   "canal.server.port" = "11111",
+//   "canal.destination" = "example",
+//   "canal.username" = "canal",
+//   "canal.password" = "canal"
+// )
+
+public class BinlogDesc {
+    private static final String TYPE = "type";
+    private Map<String, String> properties;
+    private DataSyncJobType dataSyncJobType;
+
+    public BinlogDesc(Map<String, String> properties) {
+        this.properties = properties;
+        if (this.properties == null) {
+            this.properties = Maps.newHashMap();
+        }
+        this.dataSyncJobType = DataSyncJobType.UNKNOWN;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public DataSyncJobType getDataSyncJobType() {
+        return dataSyncJobType;
+    }
+
+    public void analyze() throws AnalysisException {
+        if (!properties.containsKey(TYPE)) {
+            throw new AnalysisException("Binlog properties must contain property `type`");
+        }
+        dataSyncJobType = DataSyncJobType.fromString(properties.get(TYPE));
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java
new file mode 100644
index 0000000..51eae33
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+import com.google.gson.annotations.SerializedName;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+// used to describe channel info in data sync job
+//      channel_desc:
+//          FROM mysql_db.src_tbl INTO doris_db.des_tbl
+//          [PARTITION (p1, p2)]
+//          [(col1, ...)]
+//          [KEEP ORDER]
+public class ChannelDescription implements Writable {
+    private static final Logger LOG = LogManager.getLogger(ChannelDescription.class);
+
+    @SerializedName(value = "srcDatabase")
+    private final String srcDatabase;
+    @SerializedName(value = "srcTableName")
+    private final String srcTableName;
+    @SerializedName(value = "targetTable")
+    private final String targetTable;
+    @SerializedName(value = "partitionNames")
+    private final PartitionNames partitionNames;
+    // column names of source table
+    @SerializedName(value = "colNames")
+    private final List<String> colNames;
+
+    public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List<String> colNames) {
+        this.srcDatabase = srcDatabase;
+        this.srcTableName = srcTableName;
+        this.targetTable = targetTable;
+        this.partitionNames = partitionNames;
+        this.colNames = colNames;
+    }
+
+    public List<String> getColNames() {
+        if (colNames == null || colNames.isEmpty()) {
+            return null;
+        }
+        return colNames;
+    }
+
+    public void analyze(String fullDbName) throws AnalysisException {
+        if (Strings.isNullOrEmpty(srcDatabase)) {
+            throw new AnalysisException("No source database in channel description.");
+        }
+
+        if (Strings.isNullOrEmpty(srcTableName)) {
+            throw new AnalysisException("No source table in channel description.");
+        }
+
+        checkAuth(fullDbName);
+
+        if (partitionNames != null) {
+            partitionNames.analyze(null);
+        }
+
+        analyzeColumns();
+    }
+
+    private void checkAuth(String fullDbName) throws AnalysisException {
+        if (Strings.isNullOrEmpty(targetTable)) {
+            throw new AnalysisException("No target table is assigned in channel description.");
+        }
+
+        // check target table auth
+        if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, targetTable,
+                PrivPredicate.LOAD)) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
+                    ConnectContext.get().getQualifiedUser(),
+                    ConnectContext.get().getRemoteIP(), targetTable);
+        }
+    }
+
+    private void analyzeColumns() throws AnalysisException {
+        Set<String> columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+        if ((colNames != null && !colNames.isEmpty())) {
+            for (String columnName : colNames) {
+                if (!columnNames.add(columnName)) {
+                    throw new AnalysisException("Duplicate column: " + columnName);
+                }
+            }
+        }
+    }
+
+    public String getTargetTable() {
+        return targetTable;
+    }
+
+    public String getSrcDatabase() {
+        return srcDatabase;
+    }
+
+    public String getSrcTableName() {
+        return srcTableName;
+    }
+
+    public PartitionNames getPartitionNames() {
+        return partitionNames;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static ChannelDescription read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, ChannelDescription.class);
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDataSyncJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDataSyncJobStmt.java
new file mode 100644
index 0000000..a2435fe
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateDataSyncJobStmt.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.DataSyncJobType;
+
+import com.google.common.base.Strings;
+
+import java.util.List;
+import java.util.Map;
+
+// create sync job statement, sync mysql data into tables.
+//
+// syntax:
+//      CREATE SYNC doris_db.job_name
+//          (channel_desc, ...)
+//          binlog_desc
+//      [PROPERTIES (key1=value1, )]
+//
+//      channel_desc:
+//          FROM mysql_db.src_tbl INTO des_tbl
+//          [PARTITION (p1, p2)]
+//          [(col1, ...)]
+//          [KEEP ORDER]
+//
+//      binlog_desc:
+//          FROM BINLOG
+//          (key1=value1, ...)
+public class CreateDataSyncJobStmt extends DdlStmt {
+    private String jobName;
+    private String dbName;
+    private DataSyncJobType dataSyncJobType;
+    private final List<ChannelDescription> channelDescriptions;
+    private final BinlogDesc binlogDesc;
+    private final Map<String, String> properties;
+
+    public CreateDataSyncJobStmt(String jobName, String dbName, List<ChannelDescription> channelDescriptions,
+                                 BinlogDesc binlogDesc, Map<String, String> properties) {
+        this.jobName = jobName;
+        this.dbName = dbName;
+        this.channelDescriptions = channelDescriptions;
+        this.binlogDesc = binlogDesc;
+        this.properties = properties;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        if (Strings.isNullOrEmpty(dbName)) {
+            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+            dbName = analyzer.getDefaultDb();
+        }
+        dbName = ClusterNamespace.getFullName(analyzer.getClusterName(), dbName);
+
+        if (!Config.enable_create_sync_job) {
+            throw new AnalysisException("Mysql sync job is coming soon.");
+        }
+
+        if (binlogDesc != null) {
+            binlogDesc.analyze();
+            dataSyncJobType = binlogDesc.getDataSyncJobType();
+            if (dataSyncJobType != DataSyncJobType.CANAL) {
+                throw new AnalysisException("Data sync job now only support CANAL type");
+            }
+        }
+
+        if (channelDescriptions == null || channelDescriptions.isEmpty()) {
+            throw new AnalysisException("No channel is assign in data sync job statement.");
+        }
+
+        for (ChannelDescription channelDescription : channelDescriptions) {
+            channelDescription.analyze(dbName);
+            Database db = Catalog.getCurrentCatalog().getDb(dbName);
+            if (db == null) {
+                throw new AnalysisException("Database: " + dbName + " not found.");
+            }
+            String tableName = channelDescription.getTargetTable();
+            Table table = db.getTable(tableName);
+            if (table == null) {
+                throw new AnalysisException("Table: " + tableName + " doesn't exist");
+            }
+            if (!(table instanceof OlapTable)) {
+                throw new AnalysisException("Table: " + tableName + " is not an olap table");
+            }
+            if (((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS) {
+                throw new AnalysisException("Table: " + tableName + " is not a unique table, key type: " + ((OlapTable) table).getKeysType());
+            }
+            if (!((OlapTable) table).hasDeleteSign()) {
+                throw new AnalysisException("Table: " + tableName + " don't support batch delete. Please upgrade it to support, see `help alter table`.");
+            }
+        }
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public List<ChannelDescription> getChannelDescriptions() {
+        return channelDescriptions;
+    }
+
+    public BinlogDesc getBinlogDesc() {
+        return binlogDesc;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public DataSyncJobType getDataSyncJobType() {
+        return dataSyncJobType;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JobName.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JobName.java
new file mode 100644
index 0000000..706d126
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JobName.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+
+import com.google.common.base.Strings;
+
+public class JobName {
+    private String jobName;
+    private String dbName;
+
+    public JobName(String dbName, String jobName) {
+        this.dbName = dbName;
+        this.jobName = jobName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getName() {
+        return jobName;
+    }
+
+    public void analyze(Analyzer analyzer) throws AnalysisException {
+        if (Strings.isNullOrEmpty(dbName)) {
+            if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+            dbName = analyzer.getDefaultDb();
+        }
+        dbName = ClusterNamespace.getFullName(analyzer.getClusterName(), dbName);
+    }
+
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("`").append(dbName).append("`.`").append(jobName).append("`");
+        return sb.toString();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseSyncJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseSyncJobStmt.java
new file mode 100644
index 0000000..fdf4943
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseSyncJobStmt.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+// PAUSE SYNC JOB statement used to pause sync job.
+//
+// syntax:
+//      PAUSE SYNC JOB [db.]jobName
+public class PauseSyncJobStmt extends DdlStmt {
+
+    private JobName jobName;
+
+    public PauseSyncJobStmt(JobName jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobName() {
+        return jobName.getName();
+    }
+
+    public String getDbFullName() {
+        return jobName.getDbName();
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+        super.analyze(analyzer);
+        jobName.analyze(analyzer);
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("PAUSE SYNC JOB ");
+        stringBuilder.append(jobName.toSql());
+        return stringBuilder.toString();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeSyncJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeSyncJobStmt.java
new file mode 100644
index 0000000..b24c383
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeSyncJobStmt.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+// RESUME SYNC JOB statement used to resume sync job.
+//
+// syntax:
+//      RESUME SYNC JOB [db.]jobName
+public class ResumeSyncJobStmt extends DdlStmt {
+
+    private JobName jobName;
+
+    public ResumeSyncJobStmt(JobName jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobName() {
+        return jobName.getName();
+    }
+
+    public String getDbFullName() {
+        return jobName.getDbName();
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+        super.analyze(analyzer);
+        jobName.analyze(analyzer);
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("RESUME SYNC JOB ");
+        stringBuilder.append(jobName.toSql());
+        return stringBuilder.toString();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSyncJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSyncJobStmt.java
new file mode 100644
index 0000000..b17a646
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowSyncJobStmt.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+// SHOW SYNC JOB STATUS statement used to get status of sync job.
+//
+// syntax:
+//      SHOW SYNC JOB [FROM db]
+public class ShowSyncJobStmt extends ShowStmt {
+    private static final Logger LOG = LogManager.getLogger(ShowSyncJobStmt.class);
+
+    public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
+            .add("JobId").add("JobName").add("Type").add("State").add("Channel").add("Status")
+            .add("JobConfig").add("CreateTime").add("LastStartTime").add("LastStopTime").add("FinishTime").add("Msg")
+            .build();
+
+    private String dbName;
+
+    public ShowSyncJobStmt(String dbName) {
+        this.dbName = dbName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        if (Strings.isNullOrEmpty(dbName)) {
+            dbName = analyzer.getDefaultDb();
+            if (Strings.isNullOrEmpty(dbName)) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+        } else {
+            dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
+        }
+    }
+
+    @Override
+    public ShowResultSetMetaData getMetaData() {
+        ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
+        for (String title : TITLE_NAMES) {
+            builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("SHOW SYNC JOB");
+        if (dbName != null) {
+            builder.append(" FROM `").append(dbName).append("` ");
+        }
+        return builder.toString();
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopSyncJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopSyncJobStmt.java
new file mode 100644
index 0000000..70b9928
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopSyncJobStmt.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+// CANCEL SYNC statement used to cancel sync job.
+//
+// syntax:
+//      STOP SYNC JOB [db.]jobName
+public class StopSyncJobStmt extends DdlStmt {
+
+    private JobName jobName;
+
+    public StopSyncJobStmt(JobName jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobName() {
+        return jobName.getName();
+    }
+
+    public String getDbFullName() {
+        return jobName.getDbName();
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+        super.analyze(analyzer);
+        jobName.analyze(analyzer);
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("STOP SYNC JOB ");
+        stringBuilder.append(jobName.toSql());
+        return stringBuilder.toString();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 63ab000..86e2832 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -65,6 +65,7 @@ import org.apache.doris.analysis.InstallPluginStmt;
 import org.apache.doris.analysis.KeysDesc;
 import org.apache.doris.analysis.LinkDbStmt;
 import org.apache.doris.analysis.MigrateDbStmt;
+import org.apache.doris.analysis.ModifyDistributionClause;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.PartitionRenameClause;
 import org.apache.doris.analysis.RecoverDbStmt;
@@ -82,7 +83,6 @@ import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.UninstallPluginStmt;
 import org.apache.doris.analysis.UserDesc;
 import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.analysis.ModifyDistributionClause;
 import org.apache.doris.backup.BackupHandler;
 import org.apache.doris.catalog.ColocateTableIndex.GroupId;
 import org.apache.doris.catalog.Database.DbState;
@@ -162,6 +162,8 @@ import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadScheduler;
 import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
 import org.apache.doris.load.update.UpdateManager;
+import org.apache.doris.load.sync.SyncChecker;
+import org.apache.doris.load.sync.SyncJobManager;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.master.MetaHelper;
 import org.apache.doris.master.PartitionInMemoryInfoCollector;
@@ -223,7 +225,6 @@ import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.PublishVersionDaemon;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -303,6 +304,7 @@ public class Catalog {
     private StreamLoadRecordMgr streamLoadRecordMgr;
     private RoutineLoadManager routineLoadManager;
     private ExportMgr exportMgr;
+    private SyncJobManager syncJobManager;
     private Alter alter;
     private ConsistencyChecker consistencyChecker;
     private BackupHandler backupHandler;
@@ -489,6 +491,7 @@ public class Catalog {
         this.load = new Load();
         this.routineLoadManager = new RoutineLoadManager();
         this.exportMgr = new ExportMgr();
+        this.syncJobManager = new SyncJobManager();
         this.alter = new Alter();
         this.consistencyChecker = new ConsistencyChecker();
         this.lock = new QueryableReentrantLock(true);
@@ -1280,6 +1283,9 @@ public class Catalog {
         // Export checker
         ExportChecker.init(Config.export_checker_interval_second * 1000L);
         ExportChecker.startAll();
+        // Sync checker
+        SyncChecker.init(Config.sync_checker_interval_second);
+        SyncChecker.startAll();
         // Tablet checker and scheduler
         tabletChecker.start();
         tabletScheduler.start();
@@ -1686,6 +1692,14 @@ public class Catalog {
         return newChecksum;
     }
 
+    public long loadSyncJobs(DataInputStream dis, long checksum) throws IOException, DdlException {
+        if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_103) {
+            syncJobManager.readField(dis);
+        }
+        LOG.info("finished replay syncJobMgr from image");
+        return checksum;
+    }
+
     public long loadAlterJob(DataInputStream dis, long checksum) throws IOException {
         long newChecksum = checksum;
         for (JobType type : JobType.values()) {
@@ -2026,6 +2040,11 @@ public class Catalog {
         return checksum;
     }
 
+    public long saveSyncJobs(CountingDataOutputStream dos, long checksum) throws IOException {
+        syncJobManager.write(dos);
+        return checksum;
+    }
+
     public long saveAlterJob(CountingDataOutputStream dos, long checksum) throws IOException {
         for (JobType type : JobType.values()) {
             checksum = saveAlterJob(dos, checksum, type);
@@ -4848,6 +4867,10 @@ public class Catalog {
         return this.exportMgr;
     }
 
+    public SyncJobManager getSyncJobManager() {
+        return this.syncJobManager;
+    }
+
     public SmallFileMgr getSmallFileMgr() {
         return this.smallFileMgr;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e1012dd..d1aae11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -624,6 +624,17 @@ public class Config extends ConfigBase {
     public static String yarn_config_dir = PaloFe.DORIS_HOME_DIR + "/lib/yarn-config";
 
     /**
+     * Maximal intervals between two syncJob's commits.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static long sync_commit_interval_second = 10;
+
+    /**
+     * Sync checker's running interval.
+     */
+    @ConfField public static int sync_checker_interval_second = 5;
+
+    /**
      * Default number of waiting jobs for routine load and version 2 of load
      * This is a desired number.
      * In some situation, such as switch the master, the current number is maybe more than desired_max_waiting_jobs
@@ -1239,6 +1250,12 @@ public class Config extends ConfigBase {
     public static boolean enable_materialized_view = true;
 
     /**
+     * enable create sync job
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_create_sync_job = false;
+
+    /**
      * it can't auto-resume routine load job as long as one of the backends is down
      */
     @ConfField(mutable = true, masterOnly = true)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index 55d8713..149f3e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -216,6 +216,8 @@ public final class FeMetaVersion {
     public static final int VERSION_101 = 101;
     // add data encrypt
     public static final int VERSION_102 = 102;
+    // support sync job
+    public static final int VERSION_103 = 103;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_102;
+    public static final int VERSION_CURRENT = VERSION_103;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
index 79176a2..e0e30c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaReader.java
@@ -91,6 +91,7 @@ public class MetaReader {
             checksum = catalog.loadBrokers(dis, checksum);
             checksum = catalog.loadResources(dis, checksum);
             checksum = catalog.loadExportJob(dis, checksum);
+            checksum = catalog.loadSyncJobs(dis,checksum);
             checksum = catalog.loadBackupHandler(dis, checksum);
             checksum = catalog.loadPaloAuth(dis, checksum);
             // global transaction must be replayed before load jobs v2
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
index dff42e0..b591d08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MetaWriter.java
@@ -102,6 +102,8 @@ public class MetaWriter {
             checksum = catalog.saveResources(dos, checksum);
             metaIndices.add(new MetaIndex("exportJob", dos.getCount()));
             checksum = catalog.saveExportJob(dos, checksum);
+            metaIndices.add(new MetaIndex("syncJob", dos.getCount()));
+            checksum = catalog.saveSyncJobs(dos, checksum);
             metaIndices.add(new MetaIndex("backupHandler", dos.getCount()));
             checksum = catalog.saveBackupHandler(dos, checksum);
             metaIndices.add(new MetaIndex("paloAuth", dos.getCount()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index f648358..10bb4e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -26,10 +26,10 @@ import org.apache.doris.backup.Repository;
 import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Function;
-import org.apache.doris.catalog.FunctionSearchDesc;
 import org.apache.doris.catalog.EncryptKey;
 import org.apache.doris.catalog.EncryptKeySearchDesc;
+import org.apache.doris.catalog.Function;
+import org.apache.doris.catalog.FunctionSearchDesc;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
@@ -46,6 +46,7 @@ import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
@@ -90,7 +91,6 @@ import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.transaction.TransactionState;
-
 import com.google.common.base.Preconditions;
 
 import org.apache.logging.log4j.LogManager;
@@ -533,6 +533,16 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_CREATE_SYNC_JOB: {
+                data = SyncJob.read(in);
+                isRead = true;
+                break;
+            }
+            case OperationType.OP_UPDATE_SYNC_JOB_STATE: {
+                data = SyncJob.SyncJobUpdateStateInfo.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_FETCH_STREAM_LOAD_RECORD: {
                 data = FetchStreamLoadRecord.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/DataSyncJobType.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/DataSyncJobType.java
new file mode 100644
index 0000000..9e055a1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/DataSyncJobType.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+public enum DataSyncJobType {
+    CANAL,
+    UNKNOWN;
+
+    public static DataSyncJobType fromString(String dataSyncJobType) {
+        for (DataSyncJobType type : DataSyncJobType.values()) {
+            if (type.name().equalsIgnoreCase(dataSyncJobType)) {
+                return type;
+            }
+        }
+        return UNKNOWN;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java
new file mode 100644
index 0000000..85644b1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.UserException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class SyncChannel extends SyncLifeCycle {
+    private static final Logger LOG = LogManager.getLogger(SyncChannel.class);
+
+    protected long id;
+    protected long jobId;
+    protected Database db;
+    protected OlapTable tbl;
+    protected List<String> columns;
+    protected PartitionNames partitionNames;
+    protected String targetTable;
+    protected String srcDataBase;
+    protected String srcTable;
+    protected SyncChannelCallback callback;
+
+    public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
+        this.id = Catalog.getCurrentCatalog().getNextId();
+        this.jobId = syncJob.getId();
+        this.db = db;
+        this.tbl = table;
+        this.columns = columns;
+        this.targetTable = table.getName().toLowerCase();
+        this.srcDataBase = srcDataBase.toLowerCase();
+        this.srcTable = srcTable.toLowerCase();
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
+    }
+
+    @Override
+    public void process() {
+    }
+
+    public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
+            InterruptedException, ExecutionException {
+    }
+
+    public void abortTxn(String reason) throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+    }
+
+    public void commitTxn() throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+    }
+
+    public void initTxn(long timeoutSecond) {
+    }
+
+    public String getInfo() {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(srcDataBase).append(".").append(srcTable);
+        stringBuilder.append("->");
+        stringBuilder.append(targetTable);
+        return stringBuilder.toString();
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSrcTable() {
+        return srcTable;
+    }
+
+    public String getSrcDataBase() {
+        return srcDataBase;
+    }
+
+    public String getTargetTable() {
+        return targetTable;
+    }
+
+    public void setCallback(SyncChannelCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setPartitions(PartitionNames partitionNames) {
+        this.partitionNames = partitionNames;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java
new file mode 100644
index 0000000..8b2f239
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+public interface SyncChannelCallback {
+
+    public boolean state();
+
+    public void onFinished(long channelId);
+
+    public void onFailed(String errMsg);
+
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java
new file mode 100644
index 0000000..4e3a397
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SyncChannelHandle implements SyncChannelCallback {
+    private Logger LOG = LogManager.getLogger(SyncChannelHandle.class);
+
+    // channel id -> dummy value(-1)
+    private MarkedCountDownLatch<Long, Long> latch;
+    private Sync sync = new Sync();
+
+    public void reset(int size) {
+        this.latch = new MarkedCountDownLatch<>(size);
+    }
+
+    public void mark(SyncChannel channel) {
+        latch.addMark(channel.getId(), -1L);
+    }
+
+    public void set(Boolean mutex) {
+        if (mutex) {
+            this.sync.innerSetTrue();
+        } else {
+            this.sync.innerSetFalse();
+        }
+    }
+
+    @Override
+    public boolean state() {
+        return this.sync.innerState();
+    }
+
+    @Override
+    public void onFinished(long channelId) {
+        this.latch.markedCountDown(channelId, -1L);
+    }
+
+    @Override
+    public void onFailed(String errMsg) {
+        this.latch.countDownToZero(new Status(TStatusCode.CANCELLED, errMsg));
+    }
+
+    public void join() throws InterruptedException {
+        this.latch.await();
+    }
+
+    public Status getStatus() {
+        return latch.getStatus();
+    }
+
+    // This class describes the inner state.
+    private final class Sync {
+        private AtomicBoolean state;
+
+        boolean innerState() {
+            return this.state.get();
+        }
+
+        public boolean getState() {
+            return state.get();
+        }
+
+        void innerSetTrue() {
+            boolean s;
+            do {
+                s = getState();
+                if (s) {
+                    return;
+                }
+            } while(!state.compareAndSet(s, true));
+        }
+
+        void innerSetFalse() {
+            boolean s;
+            do {
+                s = getState();
+                if (!s) {
+                    return;
+                }
+            } while(!state.compareAndSet(s, false));
+        }
+
+        private Sync() {
+            state = new AtomicBoolean(false);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
new file mode 100644
index 0000000..4bee3f5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.task.MasterTask;
+import org.apache.doris.task.MasterTaskExecutor;
+
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class SyncChecker extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(SyncChecker.class);
+
+    private JobState jobState;
+
+    // checkers for running sync jobs
+    private static Map<JobState, SyncChecker> checkers = Maps.newHashMap();
+
+    // executors for sync tasks
+    private static Map<JobState, MasterTaskExecutor> executors = Maps.newHashMap();
+
+    private SyncChecker(JobState jobState, long intervalMs) {
+        super("sync checker " + jobState.name().toLowerCase(), intervalMs);
+        this.jobState = jobState;
+    }
+
+    public static void init(long intervalMs) {
+        checkers.put(JobState.PENDING, new SyncChecker(JobState.PENDING, intervalMs));
+
+        int poolSize = 3;
+
+        MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("sync_pending_job", poolSize, true);
+        executors.put(JobState.PENDING, pendingTaskExecutor);
+    }
+
+    public static void startAll() {
+        for (SyncChecker syncChecker : checkers.values()) {
+            syncChecker.start();
+        }
+        for (MasterTaskExecutor masterTaskExecutor : executors.values()) {
+            masterTaskExecutor.start();
+        }
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.debug("start check export jobs. job state: {}", jobState.name());
+        switch (jobState) {
+            case PENDING:
+                runPendingJobs();
+                break;
+            default:
+                LOG.warn("wrong sync job state: {}", jobState.name());
+                break;
+        }
+    }
+
+    private void runPendingJobs() {
+        SyncJobManager syncJobMgr = Catalog.getCurrentCatalog().getSyncJobManager();
+        List<SyncJob> pendingJobs = syncJobMgr.getSyncJobs(JobState.PENDING);
+        for (SyncJob job : pendingJobs) {
+            try {
+                MasterTask task = new SyncPendingTask(job);
+                if (executors.get(JobState.PENDING).submit(task)) {
+                    LOG.info("run pending sync job. job: {}", job);
+                }
+            } catch (Exception e) {
+                LOG.warn("run pending sync job error", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataConsumer.java
new file mode 100644
index 0000000..ca12d30
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataConsumer.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class SyncDataConsumer extends SyncLifeCycle {
+    private static Logger logger = LogManager.getLogger(SyncDataConsumer.class);
+
+    protected boolean debug;
+
+    public SyncDataConsumer(boolean debug) {
+        this.debug = debug;
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        logger.info("executor has been started. debug: {}", debug);
+    }
+
+    public void beginForTxn() {
+    }
+
+    public void abortForTxn(String reason) {
+    }
+
+    public void commitForTxn() {
+    }
+
+    @Override
+    public void process() {
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataReceiver.java
new file mode 100644
index 0000000..fdd52bc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncDataReceiver.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class SyncDataReceiver extends SyncLifeCycle {
+    private static Logger logger = LogManager.getLogger(SyncDataReceiver.class);
+
+    protected int readBatchSize;
+
+    public SyncDataReceiver(int readBatchSize) {
+        this.readBatchSize = readBatchSize;
+    }
+
+    @Override
+    public void process() {
+    }
+
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncFailMsg.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncFailMsg.java
new file mode 100644
index 0000000..75ce9d3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncFailMsg.java
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SyncFailMsg implements Writable {
+    public enum MsgType {
+        USER_CANCEL,
+        SUBMIT_FAIL,
+        RUN_FAIL,
+        UNKNOWN
+    }
+
+    @SerializedName(value = "msgType")
+    private SyncFailMsg.MsgType msgType;
+    @SerializedName(value = "msg")
+    private String msg;
+
+    public SyncFailMsg(MsgType msgType, String msg) {
+        this.msgType = msgType;
+        this.msg = msg;
+    }
+
+    public MsgType getMsgType() {
+        return msgType;
+    }
+
+    public void setMsgType(MsgType msgType) {
+        this.msgType = msgType;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+
+    @Override
+    public String toString() {
+        return "SyncFailMsg [type=" + msgType + ", msg=" + msg + "]";
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this, SyncFailMsg.class));
+    }
+
+    public static SyncFailMsg read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, SyncFailMsg.class);
+    }
+
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof SyncFailMsg)) {
+            return false;
+        }
+
+        SyncFailMsg other = (SyncFailMsg) obj;
+
+        return msgType.equals(other.msgType)
+                && msg.equals(other.msg);
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
new file mode 100644
index 0000000..1077734
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.analysis.BinlogDesc;
+import org.apache.doris.analysis.ChannelDescription;
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public abstract class SyncJob implements Writable {
+    private static final Logger LOG = LogManager.getLogger(SyncJob.class);
+
+    @SerializedName(value = "id")
+    protected long id;
+    @SerializedName(value = "dbId")
+    protected long dbId;
+    @SerializedName(value = "jobName")
+    protected String jobName;
+    @SerializedName(value = "channelDescriptions")
+    protected List<ChannelDescription> channelDescriptions;
+    protected BinlogDesc binlogDesc;
+    @SerializedName(value = "createTimeMs")
+    protected long createTimeMs;
+    @SerializedName(value = "lastStartTimeMs")
+    protected long lastStartTimeMs;
+    @SerializedName(value = "lastStopTimeMs")
+    protected long lastStopTimeMs;
+    @SerializedName(value = "finishTimeMs")
+    protected long finishTimeMs;
+    @SerializedName(value = "jobState")
+    protected JobState jobState;
+    @SerializedName(value = "failMsg")
+    protected SyncFailMsg failMsg;
+    @SerializedName(value = "dataSyncJobType")
+    protected DataSyncJobType dataSyncJobType;
+    protected List<SyncChannel> channels;
+
+    public SyncJob(long id, String jobName, long dbId) {
+        this.id = id;
+        this.dbId = dbId;
+        this.jobName = jobName;
+        this.jobState = JobState.PENDING;
+        this.createTimeMs = System.currentTimeMillis();
+        this.lastStartTimeMs = -1L;
+        this.lastStopTimeMs = -1L;
+        this.finishTimeMs = -1L;
+    }
+
+    public enum JobState {
+        PENDING,
+        RUNNING,
+        PAUSED,
+        CANCELLED
+    }
+
+    public static SyncJob fromStmt(long jobId, CreateDataSyncJobStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist");
+        }
+        SyncJob syncJob = null;
+        try {
+            switch (stmt.getDataSyncJobType()) {
+                case CANAL:
+                    syncJob = new CanalSyncJob(jobId, stmt.getJobName(), db.getId());
+                    break;
+                default:
+                    throw new DdlException("Unknown load job type.");
+            }
+            syncJob.setChannelDescriptions(stmt.getChannelDescriptions());
+            syncJob.checkAndSetBinlogInfo(stmt.getBinlogDesc());
+            return syncJob;
+        } catch (Exception e) {
+            throw new DdlException(e.getMessage());
+        }
+    }
+
+    // return true if job is done (CANCELLED)
+    public boolean isCompleted() {
+        return jobState == JobState.CANCELLED;
+    }
+
+    public boolean isPaused() {
+        return jobState == JobState.PAUSED;
+    }
+
+    public boolean isRunning() {
+        return jobState == JobState.RUNNING;
+    }
+
+    public boolean isCancelled() {
+        return jobState == JobState.CANCELLED;
+    }
+
+    public synchronized void updateState(JobState newState, boolean isReplay) {
+        this.jobState = newState;
+        switch (newState) {
+            case PENDING:
+                break;
+            case RUNNING:
+                this.lastStartTimeMs = System.currentTimeMillis();
+                break;
+            case PAUSED:
+                this.lastStopTimeMs = System.currentTimeMillis();
+                break;
+            case CANCELLED:
+                this.lastStopTimeMs = System.currentTimeMillis();
+                this.finishTimeMs = System.currentTimeMillis();
+                break;
+            default:
+                Preconditions.checkState(false, "wrong job state: " + newState.name());
+                break;
+        }
+        if (!isReplay) {
+            SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(id, jobState, lastStartTimeMs, lastStopTimeMs,
+                    finishTimeMs, failMsg);
+            Catalog.getCurrentCatalog().getEditLog().logUpdateSyncJobState(info);
+        }
+    }
+
+    public void checkAndSetBinlogInfo(BinlogDesc binlogDesc) throws DdlException {
+        this.binlogDesc = binlogDesc;
+    }
+
+    public abstract void execute() throws UserException;
+
+    public void cancel(MsgType msgType, String errMsg) {
+    }
+
+    public void pause() throws DdlException {
+        throw new DdlException("not implemented");
+    }
+
+    public void resume() throws DdlException {
+        throw new DdlException("not implemented");
+    }
+
+    public String getStatus() {
+        return "\\N";
+    }
+
+    public String getJobConfig() {
+        return "\\N";
+    }
+
+    // only use for persist when job state changed
+    public static class SyncJobUpdateStateInfo implements Writable {
+        @SerializedName(value = "id")
+        private long id;
+        @SerializedName(value = "lastStartTimeMs")
+        protected long lastStartTimeMs;
+        @SerializedName(value = "lastStopTimeMs")
+        protected long lastStopTimeMs;
+        @SerializedName(value = "finishTimeMs")
+        protected long finishTimeMs;
+        @SerializedName(value = "jobState")
+        protected JobState jobState;
+        @SerializedName(value = "failMsg")
+        protected SyncFailMsg failMsg;
+
+        public SyncJobUpdateStateInfo(long id, JobState jobState, long lastStartTimeMs, long lastStopTimeMs, long finishTimeMs, SyncFailMsg failMsg) {
+            this.id = id;
+            this.jobState = jobState;
+            this.lastStartTimeMs = lastStartTimeMs;
+            this.lastStopTimeMs = lastStopTimeMs;
+            this.finishTimeMs = finishTimeMs;
+            this.failMsg = failMsg;
+        }
+
+        public long getId() {
+            return this.id;
+        }
+
+        public long getLastStartTimeMs() {
+            return this.lastStartTimeMs;
+        }
+
+        public long getLastStopTimeMs() {
+            return this.lastStopTimeMs;
+        }
+
+        public long getFinishTimeMs() {
+            return this.finishTimeMs;
+        }
+
+        public JobState getJobState() {
+            return this.jobState;
+        }
+
+        public SyncFailMsg getFailMsg() {
+            return this.failMsg;
+        }
+
+        @Override
+        public String toString() {
+            return GsonUtils.GSON.toJson(this);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            Text.writeString(out, json);
+        }
+
+        public static SyncJobUpdateStateInfo read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, SyncJobUpdateStateInfo.class);
+        }
+    }
+
+    public List<Comparable> getShowInfo() {
+        List<Comparable> jobInfo = Lists.newArrayList();
+        // jobId
+        jobInfo.add(id);
+        // jobName
+        jobInfo.add(jobName);
+        // type
+        jobInfo.add(dataSyncJobType.name());
+        // state
+        jobInfo.add(jobState.name());
+        // channel
+        StringBuilder channelInfo = new StringBuilder();
+        if (channels != null) {
+            for (int i = 0; i < channels.size(); i++) {
+                channelInfo.append(channels.get(i).getInfo());
+                if (i < channels.size() - 1) {
+                    channelInfo.append(", ");
+                }
+            }
+            jobInfo.add(channelInfo.toString());
+        } else {
+            jobInfo.add(FeConstants.null_string);
+        }
+
+        // status
+        jobInfo.add(getStatus());
+        // jobConfig
+        jobInfo.add(getJobConfig());
+        // createTimeMs
+        jobInfo.add(TimeUtils.longToTimeString(createTimeMs));
+        // lastStartTimeMs
+        jobInfo.add(TimeUtils.longToTimeString(lastStartTimeMs));
+        // lastStopTimeMs
+        jobInfo.add(TimeUtils.longToTimeString(lastStopTimeMs));
+        // finishTimeMs
+        jobInfo.add(TimeUtils.longToTimeString(finishTimeMs));
+        // failMsg
+        if (failMsg == null) {
+            jobInfo.add(FeConstants.null_string);
+        } else {
+            jobInfo.add(failMsg.toString());
+        }
+        return jobInfo;
+    }
+
+    public void replayUpdateSyncJobState(SyncJobUpdateStateInfo info) {
+        lastStartTimeMs = info.getLastStartTimeMs();
+        lastStopTimeMs = info.getLastStopTimeMs();
+        finishTimeMs = info.getFinishTimeMs();
+        updateState(info.getJobState(), true);
+        LOG.info("replay update sync job state: {}", info);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this, SyncJob.class));
+    }
+
+    public static SyncJob read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, SyncJob.class);
+    }
+
+    public void setChannelDescriptions(List<ChannelDescription> channelDescriptions) {
+        this.channelDescriptions = channelDescriptions;
+    }
+
+    public long getId() {
+        return this.id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public String getJobName() {
+        return this.jobName;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public DataSyncJobType getJobType() {
+        return this.dataSyncJobType;
+    }
+
+    public SyncFailMsg getFailMsg() {
+        return failMsg;
+    }
+
+    public void setFailMsg(SyncFailMsg failMsg) {
+        this.failMsg = failMsg;
+    }
+
+    public List<ChannelDescription> getChannelDescriptions() {
+        return this.channelDescriptions;
+    }
+
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
new file mode 100644
index 0000000..60a6019
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.analysis.PauseSyncJobStmt;
+import org.apache.doris.analysis.ResumeSyncJobStmt;
+import org.apache.doris.analysis.StopSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.load.sync.SyncJob.JobState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class SyncJobManager implements Writable {
+    private static final Logger LOG = LogManager.getLogger(SyncJobManager.class);
+
+    private Map<Long, SyncJob> idToSyncJob;
+
+    private Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs;
+
+    private ReentrantReadWriteLock lock;
+
+    public SyncJobManager() {
+        idToSyncJob = Maps.newConcurrentMap();
+        dbIdToJobNameToSyncJobs = Maps.newConcurrentMap();
+        lock = new ReentrantReadWriteLock(true);
+    }
+
+    public void addDataSyncJob(CreateDataSyncJobStmt stmt) throws DdlException {
+        long jobId = Catalog.getCurrentCatalog().getNextId();
+        SyncJob syncJob = SyncJob.fromStmt(jobId, stmt);
+        writeLock();
+        try {
+            unprotectedAddSyncJob(syncJob);
+            Catalog.getCurrentCatalog().getEditLog().logCreateSyncJob(syncJob);
+        } finally {
+            writeUnlock();
+        }
+        LOG.info("add sync job. {}", syncJob);
+    }
+
+    private void unprotectedAddSyncJob(SyncJob syncJob) {
+        idToSyncJob.put(syncJob.getId(), syncJob);
+        long dbId = syncJob.getDbId();
+        if (!dbIdToJobNameToSyncJobs.containsKey(dbId)) {
+            dbIdToJobNameToSyncJobs.put(syncJob.getDbId(), Maps.newConcurrentMap());
+        }
+        Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(dbId);
+        if (!map.containsKey(syncJob.getJobName())) {
+            map.put(syncJob.getJobName(), Lists.newArrayList());
+        }
+        map.get(syncJob.getJobName()).add(syncJob);
+    }
+
+    public void pauseSyncJob(PauseSyncJobStmt stmt) throws DdlException {
+        String dbName = stmt.getDbFullName();
+        String jobName = stmt.getJobName();
+
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> runningSyncJob = matchJobs.stream().filter(entity -> entity.isRunning())
+                    .collect(Collectors.toList());
+            if (runningSyncJob.isEmpty()) {
+                throw new DdlException("There is no running job with jobName `"
+                        + stmt.getJobName() + "` to pause");
+            }
+
+            syncJobs.addAll(runningSyncJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.pause();
+        }
+    }
+
+    public void resumeSyncJob(ResumeSyncJobStmt stmt) throws DdlException {
+        String dbName = stmt.getDbFullName();
+        String jobName = stmt.getJobName();
+
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> pausedSyncJob = matchJobs.stream().filter(entity -> entity.isPaused())
+                    .collect(Collectors.toList());
+            if (pausedSyncJob.isEmpty()) {
+                throw new DdlException("There is no paused job with jobName `"
+                        + stmt.getJobName() + "` to resume");
+            }
+
+            syncJobs.addAll(pausedSyncJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.resume();
+        }
+    }
+
+    public void stopSyncJob(StopSyncJobStmt stmt) throws DdlException {
+        String dbName = stmt.getDbFullName();
+        String jobName = stmt.getJobName();
+
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+
+        // List of sync jobs waiting to be cancelled
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        readLock();
+        try {
+            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
+            if (matchJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            List<SyncJob> uncompletedSyncJob = matchJobs.stream().filter(entity -> !entity.isCompleted())
+                    .collect(Collectors.toList());
+            if (uncompletedSyncJob.isEmpty()) {
+                throw new DdlException("There is no uncompleted job with jobName `"
+                        + stmt.getJobName() + "`");
+            }
+
+            syncJobs.addAll(uncompletedSyncJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
+        }
+    }
+
+    // caller should hold the db lock
+    private List<SyncJob> getSyncJobsByDbAndJobName(long dbId, String jobName) {
+        List<SyncJob> syncJobs = Lists.newArrayList();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.get(dbId);
+        if (jobNameToSyncJobs != null) {
+            if (jobNameToSyncJobs.containsKey(jobName)) {
+                syncJobs.addAll(jobNameToSyncJobs.get(jobName));
+            }
+        }
+        return syncJobs;
+    }
+
+    public List<List<Comparable>> getSyncJobsInfoByDbId(long dbId) {
+        LinkedList<List<Comparable>> syncJobInfos = new LinkedList<List<Comparable>>();
+
+        readLock();
+        try {
+            if (!dbIdToJobNameToSyncJobs.containsKey(dbId)) {
+                return syncJobInfos;
+            }
+            Map<String, List<SyncJob>> jobNameToLoadJobs = dbIdToJobNameToSyncJobs.get(dbId);
+            List<SyncJob> syncJobs = Lists.newArrayList();
+            syncJobs.addAll(jobNameToLoadJobs.values()
+                    .stream().flatMap(Collection::stream).collect(Collectors.toList()));
+            for (SyncJob syncJob : syncJobs) {
+                syncJobInfos.add(syncJob.getShowInfo());
+            }
+            return syncJobInfos;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public List<SyncJob> getSyncJobs(SyncJob.JobState state) {
+        List<SyncJob> result = Lists.newArrayList();
+        readLock();
+        try {
+            for (SyncJob job : idToSyncJob.values()) {
+                if (job.getJobState() == state) {
+                    result.add(job);
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+
+        return result;
+    }
+
+    public boolean isJobNameExist(String dbName, String jobName) throws DdlException {
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+        boolean result = false;
+        readLock();
+        try {
+            Map<String, List<SyncJob>> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.get(db.getId());
+            if (jobNameToSyncJobs != null && jobNameToSyncJobs.containsKey(jobName)) {
+                List<SyncJob> matchJobs = jobNameToSyncJobs.get(jobName);
+                for(SyncJob syncJob : matchJobs) {
+                    if (!syncJob.isCancelled()) {
+                        result = true;
+                    }
+                }
+            }
+        } finally {
+            readUnlock();
+        }
+
+        return result;
+    }
+
+    public SyncJob getSyncJobById(long jobId) {
+        return idToSyncJob.get(jobId);
+    }
+
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Collection<SyncJob> syncJobs = idToSyncJob.values();
+        out.writeInt(syncJobs.size());
+        for (SyncJob syncJob : syncJobs) {
+            syncJob.write(out);
+        }
+    }
+
+    public void readField(DataInput in) throws IOException {
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            SyncJob syncJob = SyncJob.read(in);
+            if (!syncJob.isCompleted()) {
+                syncJob.updateState(JobState.PENDING, true);
+            }
+            unprotectedAddSyncJob(syncJob);
+        }
+    }
+
+    public void replayAddSyncJob(SyncJob syncJob) {
+        writeLock();
+        try {
+            unprotectedAddSyncJob(syncJob);
+        } finally {
+            writeUnlock();
+        }
+    }
+    public void replayUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo info) {
+        writeLock();
+        try {
+            long jobId = info.getId();
+            SyncJob job = idToSyncJob.get(jobId);
+            if (job == null) {
+                LOG.warn("replay update sync job state failed. Job was not found, id: {}", jobId);
+                return;
+            }
+            job.replayUpdateSyncJobState(info);
+        } finally {
+            writeUnlock();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
new file mode 100644
index 0000000..3c98137
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class SyncLifeCycle {
+    private Logger logger = LogManager.getLogger(SyncLifeCycle.class);
+
+    protected volatile boolean running = false;
+    public Thread thread;
+
+    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("process thread has an error", e);
+        }
+    };
+
+    public abstract void process();
+
+    public boolean isStart() {
+        return this.running;
+    }
+
+    public void start() {
+        if (isStart()) {
+            throw new RuntimeException(this.getClass().getName() + " has startup , don't repeat start");
+        }
+
+        thread = new Thread(new Runnable() {
+            public void run() {
+                process();
+            }
+        });
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+
+        this.running = true;
+    }
+
+    public void stop() {
+        if (!isStart()) {
+            throw new RuntimeException(this.getClass().getName() + " isn't start , please check");
+        }
+
+        this.running = false;
+
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java
new file mode 100644
index 0000000..e9b7695
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.task.MasterTask;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class SyncPendingTask extends MasterTask {
+    private static final Logger LOG = LogManager.getLogger(SyncPendingTask.class);
+
+    private SyncJob syncJob;
+
+    public SyncPendingTask(SyncJob syncJob) {
+        super();
+        this.syncJob = syncJob;
+        this.signature = syncJob.getId();
+    }
+
+    @Override
+    protected void exec() {
+        if (syncJob.getJobState() != JobState.PENDING) {
+            return;
+        }
+
+        try {
+            syncJob.execute();
+        } catch (UserException e) {
+            String failMsg = String.format("sync job execute pending task failed, jobName: %s, msg: %s", syncJob.getJobName(), e.getMessage());
+            syncJob.cancel(MsgType.UNKNOWN, failMsg);
+            LOG.warn(failMsg);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncTxnParam.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncTxnParam.java
new file mode 100644
index 0000000..701c6a9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncTxnParam.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
+
+public class SyncTxnParam {
+    private TUniqueId loadId;
+    private long txnId;
+    private TTxnParams txnConf;
+    private Backend backend;
+
+    public SyncTxnParam setTxnConf(TTxnParams txnConf) {
+        this.txnConf = txnConf;
+        return this;
+    }
+
+    public SyncTxnParam setLoadId(TUniqueId loadId) {
+        this.loadId = loadId;
+        return this;
+    }
+
+    public SyncTxnParam setTxnId(long txnId) {
+        this.txnId = txnId;
+        return this;
+    }
+
+    public SyncTxnParam setBackend(Backend backend) {
+        this.backend = backend;
+        return this;
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
+
+    public TUniqueId getLoadId() {
+        return loadId;
+    }
+
+    public TTxnParams getTxnConf() {
+        return txnConf;
+    }
+
+    public Backend getBackend() {
+        return backend;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java
new file mode 100644
index 0000000..841acb6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalConfigs.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+public class CanalConfigs {
+
+    // Maximal waiting time for receiver to get one batch
+    public static long getWaitingTimeoutMs = 30L;
+
+    // Maximal waiting time for consumer to poll one batch
+    public static long pollWaitingTimeoutMs = 80L;
+
+    // Maximal waiting time for channel to poll one batch
+    public static long channelWaitingTimeoutMs = 1000L;
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
new file mode 100644
index 0000000..8b5dc54
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.SyncChannel;
+import org.apache.doris.load.sync.SyncChannelCallback;
+import org.apache.doris.load.sync.SyncJob;
+import org.apache.doris.load.sync.model.Data;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.qe.InsertStreamTxnExecutor;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionState;
+
+import com.alibaba.otter.canal.common.CanalException;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class CanalSyncChannel extends SyncChannel {
+    private static final Logger LOG = LogManager.getLogger(CanalSyncChannel.class);
+
+    private static final String DELETE_COLUMN = "_delete_sign_";
+    private static final String DELETE_CONDITION = DELETE_COLUMN + "=1";
+    private static final String NULL_VALUE_FOR_LOAD = "\\N";
+
+    private long timeoutSecond;
+    private long lastBatchId;
+    private LinkedBlockingQueue<Data<InternalService.PDataRow>> pendingQueue;
+    private Data<InternalService.PDataRow> batchBuffer;
+    private InsertStreamTxnExecutor txnExecutor;
+
+    public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
+        super(syncJob, db, table, columns, srcDataBase, srcTable);
+        this.batchBuffer = new Data<>();
+        this.pendingQueue = Queues.newLinkedBlockingQueue(128);
+        this.lastBatchId = -1L;
+        this.timeoutSecond = -1L;
+    }
+
+    public void process() {
+        while (running) {
+            if (!isTxnInit()) {
+                continue;
+            }
+            // if txn has begun, send all data in queue
+            if (isTxnBegin()) {
+                while (!pendingQueue.isEmpty()) {
+                    try {
+                        Data<InternalService.PDataRow> rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS);
+                        if (rows != null) {
+                            sendData(rows);
+                        }
+                    } catch (Exception e) {
+                        String errMsg = "encounter exception in channel, channel " + id + ", " +
+                                "msg: " + e.getMessage() + ", table: " + targetTable;
+                        LOG.error(errMsg);
+                        callback.onFailed(errMsg);
+                    }
+                }
+            }
+            if (callback.state()) {
+                callback.onFinished(id);
+            }
+        }
+    }
+
+    @Override
+    public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        if (!isTxnBegin()) {
+            long currentTime = System.currentTimeMillis();
+            String label = "label_job" + + jobId + "_channel" + id + "_db" + db.getId() + "_tbl" + tbl.getId()
+                    + "_batch" + batchId + "_" + currentTime;
+            String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
+            GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
+            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+            TTxnParams txnConf = txnEntry.getTxnConf();
+            TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
+            TStreamLoadPutRequest request = null;
+            try {
+                long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
+                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
+                String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+                        db.getId(), txnId).getAuthCode();
+                request = new TStreamLoadPutRequest()
+                        .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
+                        .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
+                        .setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
+                        .setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
+                        .setColumns(targetColumn);
+                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
+                txnEntry.setLabel(label);
+                txnExecutor.setTxnId(txnId);
+            } catch (DuplicatedRequestException e) {
+                LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
+                        id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
+                txnExecutor.setTxnId(e.getTxnId());
+            } catch (LabelAlreadyUsedException e) {
+                // this happens when channel re-consume same batch, we should just pass through it without begin a new txn
+                LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
+                return;
+            } catch (AnalysisException | BeginTransactionException e) {
+                LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
+                throw e;
+            } catch (UserException e) {
+                LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
+                throw e;
+            }
+            try {
+                // async exec begin transaction
+                long txnId = txnExecutor.getTxnId();
+                if (txnId != -1L) {
+                    this.txnExecutor.beginTransaction(request);
+                    LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+                }
+            } catch (TException e) {
+                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+                throw e;
+            } catch (TimeoutException | InterruptedException | ExecutionException e) {
+                LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void abortTxn(String reason) throws TException, TimeoutException, InterruptedException, ExecutionException {
+        if (!isTxnBegin()) {
+            LOG.warn("No transaction to abort in channel {}, table: {}", id, targetTable);
+            return;
+        }
+        try {
+            this.txnExecutor.abortTransaction();
+            LOG.info("abort txn in channel {}, table: {}, txn id: {}, last batch: {}, reason: {}",
+                    id, targetTable, txnExecutor.getTxnId(), lastBatchId, reason);
+        } catch (TException e) {
+            LOG.warn("Failed to abort txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+            throw e;
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Error occur while waiting abort txn response in channel {}, table: {}, txn: {}, msg:{}",
+                    id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+            throw e;
+        }  finally {
+            this.batchBuffer = new Data<>();
+            this.pendingQueue.clear();
+            updateBatchId(-1L);
+        }
+    }
+    @Override
+    public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException {
+        if (!isTxnBegin()) {
+            LOG.warn("No transaction to commit in channel {}, table: {}", id, targetTable);
+            return;
+        }
+        try {
+            flushData();
+            this.txnExecutor.commitTransaction();
+            LOG.info("commit txn in channel {}, table: {}, txn id: {}, last batch: {}",
+                    id, targetTable, txnExecutor.getTxnId(), lastBatchId);
+        } catch (TException e) {
+            LOG.warn("Failed to commit txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+            throw e;
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Error occur while waiting commit txn return in channel {}, table: {}, txn: {}, msg:{}",
+                    id, targetTable, txnExecutor.getTxnId(), e.getMessage());
+            throw e;
+        } finally {
+            this.batchBuffer = new Data<>();
+            this.pendingQueue.clear();
+            updateBatchId(-1L);
+        }
+    }
+    @Override
+    public void initTxn(long timeoutSecond) {
+        if (!isTxnInit()) {
+            UUID uuid = UUID.randomUUID();
+            TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+            this.timeoutSecond = timeoutSecond;
+            TTxnParams txnConf = new TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000)
+                    .setTxnId(-1).setDb(db.getFullName()).setTbl(tbl.getName()).setDbId(db.getId());
+            this.txnExecutor = new InsertStreamTxnExecutor(new TransactionEntry(txnConf, db, tbl));
+            txnExecutor.setTxnId(-1L);
+            txnExecutor.setLoadId(loadId);
+        }
+    }
+
+    public void clearTxn() {
+        this.txnExecutor = null;
+    }
+
+    public void submit(long batchId, CanalEntry.EventType eventType, CanalEntry.RowChange rowChange) {
+        String sql = rowChange.getSql();
+        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+            switch (eventType) {
+                case DELETE:
+                    execute(batchId, eventType, rowData.getBeforeColumnsList());
+                    break;
+                case INSERT:
+                    execute(batchId, eventType, rowData.getAfterColumnsList());
+                    break;
+                case UPDATE:
+                    execute(batchId, eventType, rowData.getAfterColumnsList());
+                    break;
+                default:
+                    LOG.warn("ignore event, channel: {}, schema: {}, table: {}, SQL: {}", id, srcDataBase, srcTable, sql);
+            }
+        }
+    }
+
+    private void execute(long batchId, CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
+        InternalService.PDataRow row = parseRow(eventType, columns);
+        try {
+            Preconditions.checkState(isTxnInit());
+            if (batchId > lastBatchId) {
+                if (!isTxnBegin()) {
+                    beginTxn(batchId);
+                } else {
+                    this.pendingQueue.put(this.batchBuffer);
+                    this.batchBuffer = new Data<>();
+                }
+                updateBatchId(batchId);
+            }
+        } catch (Exception e) {
+            String errMsg = "encounter exception when submit in channel " + id + ", table: "
+                    + targetTable + ", batch: " + batchId;
+            LOG.error(errMsg, e);
+            throw new CanalException(errMsg, e);
+        }
+        this.batchBuffer.addRow(row);
+    }
+
+    private InternalService.PDataRow parseRow(CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
+        InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
+        for (int i = 0; i < columns.size(); i++) {
+            if (columns.get(i).getIsNull()) {
+                row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+            } else {
+                row.addColBuilder().setValue(columns.get(i).getValue());
+            }
+        }
+        // add batch delete condition to the tail
+        if (eventType == CanalEntry.EventType.DELETE) {
+            row.addColBuilder().setValue("1");
+        } else {
+            row.addColBuilder().setValue("0");
+        }
+        return row.build();
+    }
+
+    private void sendData(Data<InternalService.PDataRow> rows) throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        Preconditions.checkState(isTxnBegin());
+        TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+        txnEntry.setDataToSend(rows.getDatas());
+        this.txnExecutor.sendData();
+    }
+
+    public void flushData() throws TException, TimeoutException,
+            InterruptedException, ExecutionException {
+        if (batchBuffer.isNotEmpty()) {
+            sendData(batchBuffer);
+            batchBuffer = new Data<>();
+        }
+    }
+
+    public boolean isTxnBegin() {
+        return isTxnInit() && this.txnExecutor.getTxnId() != -1;
+    }
+
+    public boolean isTxnInit() {
+        return this.txnExecutor != null;
+    }
+
+    private void updateBatchId(long batchId) {
+        this.lastBatchId = batchId;
+    }
+
+    public String getInfo() {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(srcDataBase).append(".").append(srcTable);
+        stringBuilder.append("->");
+        stringBuilder.append(targetTable);
+        return stringBuilder.toString();
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public String getSrcTable() {
+        return srcTable;
+    }
+
+    public String getSrcDataBase() {
+        return srcDataBase;
+    }
+
+    public String getTargetTable() {
+        return targetTable;
+    }
+
+    public void setCallback(SyncChannelCallback callback) {
+        this.callback = callback;
+    }
+
+    public void setPartitions(PartitionNames partitionNames) {
+        this.partitionNames = partitionNames;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java
new file mode 100644
index 0000000..10c393b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.position.EntryPosition;
+import org.apache.doris.load.sync.model.Events;
+import org.apache.doris.load.sync.position.PositionMeta;
+import org.apache.doris.load.sync.position.PositionRange;
+import org.apache.doris.load.sync.SyncChannelHandle;
+import org.apache.doris.load.sync.SyncDataConsumer;
+import org.apache.doris.load.sync.SyncFailMsg;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.common.CanalException;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class CanalSyncDataConsumer extends SyncDataConsumer {
+    private static Logger logger = LogManager.getLogger(CanalSyncDataConsumer.class);
+
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static final long COMMIT_MEM_SIZE = 64 * 1024 * 1024; // 64mb;
+
+    private CanalSyncJob syncJob;
+    private CanalConnector connector;
+    private Map<Long, CanalSyncChannel> idToChannels;
+    private Set<Long> ackBatches;
+    private PositionMeta<EntryPosition> positionMeta;
+    private LinkedBlockingQueue<Events<CanalEntry.Entry, EntryPosition>> dataBlockingQueue;
+    private SyncChannelHandle handle;
+    private ReentrantLock getLock;
+    private int sleepTimeMs;
+    private long commitIntervalSecond;
+
+    public void setChannels(Map<Long, CanalSyncChannel> idToChannels) {
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            this.positionMeta.setCommitPosition(channel.getId(), EntryPosition.MIN_POS);
+            channel.setCallback(handle);
+        }
+        this.idToChannels = idToChannels;
+    }
+
+    public CanalSyncDataConsumer(CanalSyncJob syncJob, CanalConnector connector, ReentrantLock getLock, boolean debug) {
+        super(debug);
+        this.syncJob = syncJob;
+        this.connector = connector;
+        this.dataBlockingQueue = Queues.newLinkedBlockingQueue(1024);
+        this.ackBatches = Sets.newHashSet();
+        this.positionMeta = new PositionMeta<>();
+        this.getLock = getLock;
+        this.handle = new SyncChannelHandle();
+        this.commitIntervalSecond = Config.sync_commit_interval_second;
+        this.sleepTimeMs = 500;
+    }
+
+    public void stop(boolean needCleanUp) {
+        super.stop();
+        if (needCleanUp) {
+            cleanUp();
+        }
+    }
+
+    @Override
+    public void beginForTxn() {
+        handle.set(false);
+        handle.reset(idToChannels.size());
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            channel.initTxn(Config.max_stream_load_timeout_second);
+            handle.mark(channel);
+        }
+    }
+
+    @Override
+    public void abortForTxn(String reason) {
+        abortForTxn(idToChannels.values(), reason);
+    }
+
+    public void abortForTxn(Collection<CanalSyncChannel> channels, String reason) {
+        logger.info("client is aborting transactions. JobId: {}, reason: {}", syncJob.getId(), reason);
+        for (CanalSyncChannel channel : channels) {
+            if (channel.isTxnBegin()) {
+                try {
+                    channel.abortTxn(reason);
+                } catch (Exception e) {
+                    logger.warn("Abort channel failed. jobId: {},channel: {}, target: {}, msg: {}",
+                            syncJob.getId(), channel.getId(), channel.getTargetTable(), e.getMessage());
+                }
+            }
+        }
+        rollback();
+    }
+
+    @Override
+    public void commitForTxn() {
+        logger.info("client is committing transactions. JobId: {}", syncJob.getId());
+        boolean success = true;
+        EntryPosition latestPosition = positionMeta.getLatestPosition();
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            if (channel.isTxnBegin()) {
+                try {
+                    channel.commitTxn();
+                    this.positionMeta.setCommitPosition(channel.getId(), latestPosition);
+                } catch (Exception ce) {
+                    logger.warn("Commit channel failed. JobId: {}, channel: {}, target: {}, msg: {}",
+                            syncJob.getId(), channel.getId(), channel.getTargetTable(), ce.getMessage());
+                    try {
+                        channel.abortTxn(ce.getMessage());
+                    } catch (Exception ae) {
+                        logger.warn("Abort channel failed. JobId: {},channel: {}, target: {}, msg: {}",
+                                syncJob.getId(), channel.getId(), channel.getTargetTable(), ae.getMessage());
+                    }
+                    success = false;
+                }
+            }
+        }
+        if (success) {
+            ack();
+        } else {
+            rollback();
+        }
+    }
+
+    public Status waitForTxn() {
+        Status st = Status.CANCELLED;
+        handle.set(true);
+        try {
+            handle.join();
+            st = handle.getStatus();
+        } catch (InterruptedException e) {
+            logger.warn("InterruptedException: ", e);
+        } finally {
+            handle.set(false);
+        }
+        return st;
+    }
+
+    public void cleanForTxn() {
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            if (channel.isTxnInit()) {
+                channel.clearTxn();
+            }
+        }
+    }
+
+    @Override
+    public void process() {
+        while (running) {
+            try {
+                int totalSize = 0;
+                long totalMemSize = 0L;
+                long startTime = System.currentTimeMillis();
+                beginForTxn();
+                while (true) {
+                    Events<CanalEntry.Entry, EntryPosition> dataEvents = null;
+                    try {
+                        dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        // do nothing
+                    }
+                    if (dataEvents == null) {
+                        if (totalSize > 0 || totalMemSize > 0) {
+                            break;
+                        }
+                        try {
+                            Thread.sleep(sleepTimeMs);
+                        } catch (InterruptedException e) {
+                            // do nothing
+                        }
+                    } else {
+                        if (debug) {
+                            // print summary of the batch
+                            CanalUtils.printSummary(dataEvents);
+                        }
+                        List<CanalEntry.Entry> entries = dataEvents.getDatas();
+                        int size = entries.size();
+                        ackBatches.add(dataEvents.getId());
+                        positionMeta.addBatch(dataEvents.getId(), dataEvents.getPositionRange());
+                        executeOneBatch(dataEvents);
+                        totalSize += size;
+                        totalMemSize += dataEvents.getMemSize();
+                        if (totalMemSize >= COMMIT_MEM_SIZE) {
+                            break;
+                        }
+                    }
+
+                    if (System.currentTimeMillis() - startTime >= commitIntervalSecond * 1000) {
+                        break;
+                    }
+                }
+                Status st = waitForTxn();
+                if (st.ok()) {
+                    commitForTxn();
+                } else {
+                    abortForTxn(st.getErrorMsg());
+                    syncJob.cancel(SyncFailMsg.MsgType.RUN_FAIL, st.getErrorMsg());
+                }
+            } catch (Exception e) {
+                logger.error("Executor is error!", e);
+                abortForTxn(e.getMessage());
+                syncJob.cancel(SyncFailMsg.MsgType.SUBMIT_FAIL, e.getMessage());
+            } finally {
+                cleanForTxn();
+            }
+        }
+    }
+
+    public void put(Message message, int size) {
+        List<CanalEntry.Entry> entries;
+        if (message.isRaw()) {
+            entries = new ArrayList<>(message.getRawEntries().size());
+            for (ByteString rawEntry : message.getRawEntries()) {
+                try {
+                    entries.add(CanalEntry.Entry.parseFrom(rawEntry));
+                } catch (InvalidProtocolBufferException e) {
+                    throw new CanalException(e);
+                }
+            }
+        } else {
+            entries = message.getEntries();
+        }
+
+        int startIndex = 0;
+        // if last ack position is null, it is the first time to consume batch (startOffset = 0)
+        EntryPosition lastAckPosition = positionMeta.getAckPosition();
+        if (lastAckPosition != null) {
+            EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0));
+            // only get data after the last ack position
+            if (EntryPosition.min(firstPosition, lastAckPosition).equals(firstPosition)) {
+                for (int i = 0; i <= entries.size() - 1; i++) {
+                    startIndex++;
+                    if (EntryPosition.checkPosition(entries.get(i), lastAckPosition)) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (startIndex <= size - 1) {
+            Events<CanalEntry.Entry, EntryPosition> dataEvents = new Events<>(message.getId());
+            PositionRange<EntryPosition> range = new PositionRange<>();
+            dataEvents.setPositionRange(range);
+            range.setStart(EntryPosition.createPosition(entries.get(startIndex)));
+            range.setEnd(EntryPosition.createPosition(entries.get(size - 1)));
+            dataEvents.setDatas(entries);
+            long memsize = 0L;
+            for (CanalEntry.Entry entry : entries) {
+                memsize += entry.getHeader().getEventLength();
+            }
+            dataEvents.setMemSize(memsize);
+            try {
+                dataBlockingQueue.put(dataEvents);
+            } catch (InterruptedException e) {
+                logger.error("put message to executor error:", e);
+                throw new CanalException(e);
+            }
+        }
+    }
+
+    private void executeOneBatch(Events<CanalEntry.Entry, EntryPosition> dataEvents) throws UserException {
+        final long batchId = dataEvents.getId();
+        Map<String, CanalSyncChannel> preferChannels = Maps.newHashMap();
+        Map<String, CanalSyncChannel> secondaryChannels = Maps.newHashMap();
+        EntryPosition startPosition = dataEvents.getPositionRange().getStart();
+        EntryPosition endPosition = dataEvents.getPositionRange().getEnd();
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
+            String key = channel.getSrcDataBase() + "." + channel.getSrcTable();
+            if (commitPosition.compareTo(startPosition) < 0) {
+                preferChannels.put(key, channel);
+            }
+            else if (commitPosition.compareTo(endPosition) < 0) {
+                secondaryChannels.put(key, channel);
+            }
+        }
+
+        // distribute data to channels
+        for (CanalEntry.Entry entry : dataEvents.getDatas()) {
+            CanalEntry.EntryType entryType = entry.getEntryType();
+            try {
+                if (entryType == CanalEntry.EntryType.ROWDATA) {
+                    CanalEntry.RowChange rowChange;
+                    try {
+                        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                    } catch (InvalidProtocolBufferException e) {
+                        throw new CanalException("parse event has an error , data:" + entry.toString(), e);
+                    }
+                    final CanalEntry.Header header = entry.getHeader();
+                    final CanalEntry.EventType eventType = rowChange.getEventType();
+                    if (!CanalUtils.isDML(eventType) || rowChange.getIsDdl()) {
+                        String sql = rowChange.getSql();
+                        processDDL(header, eventType, sql);
+                        return;
+                    }
+                    String schemaTableName = CanalUtils.getFullName(header.getSchemaName(), header.getTableName());
+                    if (preferChannels.containsKey(schemaTableName)) {
+                        CanalSyncChannel channel = preferChannels.get(schemaTableName);
+                        channel.submit(batchId, eventType, rowChange);
+                    } else if (secondaryChannels.containsKey(schemaTableName)) {
+                        CanalSyncChannel channel = secondaryChannels.get(schemaTableName);
+                        EntryPosition position = EntryPosition.createPosition(entry);
+                        EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
+                        if (position.compareTo(commitPosition) > 0) {
+                            channel.submit(batchId, eventType, rowChange);
+                        }
+                    }
+                    // print row
+                    if (debug) {
+                        CanalUtils.printRow(rowChange, header);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("execute event has an error, data: {}, msg: {}", entry.toString(), e);
+                throw new UserException("execute batch failed, batchId: " + batchId + " ,msg: " + e.getMessage());
+            }
+        }
+    }
+
+    // currently not support ddl
+    private void processDDL(CanalEntry.Header header, CanalEntry.EventType eventType, String sql) {
+        String table = header.getSchemaName() + "." + header.getTableName();
+        switch (eventType) {
+            case CREATE:
+                logger.warn("parse create table event, table: {}, sql: {}", table, sql);
+                return;
+            case ALTER:
+                logger.warn("parse alter table event, table: {}, sql: {}", table, sql);
+                return;
+            case TRUNCATE:
+                logger.warn("parse truncate table event, table: {}, sql: {}", table, sql);
+                return;
+            case ERASE:
+            case QUERY:
+                logger.warn("parse event : {}, sql: {} . ignored!", eventType.name(), sql);
+                return;
+            case RENAME:
+                logger.warn("parse rename table event, table: {}, sql: {}", table, sql);
+                return;
+            case CINDEX:
+                logger.warn("parse create index event, table: {}, sql: {}", table, sql);
+                return;
+            case DINDEX:
+                logger.warn("parse delete index event, table: {}, sql: {}", table, sql);
+                return;
+            default:
+                logger.warn("parse unknown event: {}, table: {}, sql: {}", eventType.name(), table, sql);
+                break;
+        }
+    }
+
+    private void ack() {
+        if (!ackBatches.isEmpty()) {
+            logger.info("client ack batches: {}", ackBatches);
+            while (!ackBatches.isEmpty()) {
+                // ack the oldest batch
+                long minBatchId = Collections.min(ackBatches);
+                connector.ack(minBatchId);
+                ackBatches.remove(minBatchId);
+                PositionRange<EntryPosition> positionRange = positionMeta.removeBatch(minBatchId);
+                positionMeta.setAckPosition(positionRange.getEnd());
+                positionMeta.setAckTime(System.currentTimeMillis());
+            }
+        }
+    }
+
+    private void rollback() {
+        holdGetLock();
+        try {
+            connector.rollback();
+            // Wait for the receiver to put the last message into the queue before clearing queue
+            try {
+                Thread.sleep(1000L);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        } finally {
+            releaseGetLock();
+        }
+        dataBlockingQueue.clear();
+        ackBatches.clear();
+        positionMeta.clearAllBatch();
+    }
+
+    public String getPositionInfo() {
+        EntryPosition ackPosition = positionMeta.getAckPosition();
+        long ackTime = positionMeta.getAckTime();
+        StringBuilder sb = new StringBuilder();
+        if (ackPosition != null) {
+            SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+            long executeTime = ackPosition.getExecuteTime();
+            long delayTime = ackTime - executeTime;
+            Date date = new Date(executeTime);
+            sb.append("position:").append(ackPosition)
+                    .append(", executeTime:[").append(format.format(date)).append("], ")
+                    .append("delay:").append(delayTime).append("ms");
+            if (StringUtils.isNotEmpty(ackPosition.getGtid())) {
+                sb.append(", gtid(").append(ackPosition.getGtid())
+                        .append(") ");
+            }
+        } else {
+            sb.append("position:").append("N/A");
+        }
+        return sb.toString();
+    }
+
+    private void cleanUp() {
+        dataBlockingQueue.clear();
+        ackBatches.clear();
+        positionMeta.cleanUp();
+    }
+
+    private void holdGetLock() {
+        getLock.lock();
+    }
+
+    private void releaseGetLock() {
+        getLock.unlock();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataReceiver.java
new file mode 100644
index 0000000..fe78f66
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataReceiver.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.load.sync.SyncDataReceiver;
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class CanalSyncDataReceiver extends SyncDataReceiver {
+    private static Logger LOG = LogManager.getLogger(CanalSyncDataReceiver.class);
+
+    private CanalSyncJob syncJob;
+    private CanalConnector connector;
+    private ReentrantLock getLock;
+    private CanalSyncDataConsumer consumer;
+    private String destination;
+    private String filter;
+    private long sleepTimeMs;
+
+    public CanalSyncDataReceiver(CanalSyncJob syncJob, CanalConnector connector, String destination, String filter,
+                                 CanalSyncDataConsumer consumer, int readBatchSize, ReentrantLock getLock) {
+        super(readBatchSize);
+        this.syncJob = syncJob;
+        this.connector = connector;
+        this.consumer = consumer;
+        this.destination = destination;
+        this.filter = filter;
+        this.getLock = getLock;
+        this.sleepTimeMs = 20L;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        LOG.info("receiver has been started. destination: {}, filter: {}, batch size: {}",
+                destination, filter, readBatchSize);
+    }
+
+    @Override
+    public void process() {
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe(filter);
+                connector.rollback();
+                while (running) {
+                    int size;
+                    //  get one batch
+                    Message message;
+                    holdGetLock();
+                    try {
+                        message = connector.getWithoutAck(readBatchSize,
+                                CanalConfigs.getWaitingTimeoutMs, TimeUnit.MILLISECONDS);
+                    } finally {
+                        releaseGetLock();
+                    }
+
+                    if (message.isRaw()) {
+                        size = message.getRawEntries().size();
+                    } else {
+                        size = message.getEntries().size();
+                    }
+                    if (message.getId() == -1 || size == 0) {
+                        try {
+                            Thread.sleep(sleepTimeMs);
+                        } catch (InterruptedException e) {
+                            // do nothing
+                        }
+                    } else {
+                        consumer.put(message, size); // submit batch to consumer
+                    }
+                }
+            } catch (Throwable e) {
+                LOG.error("Receiver is error. {}", e.getMessage());
+                try {
+                    Thread.sleep(1000L);
+                } catch (InterruptedException e1) {
+                    // ignore
+                }
+            } finally {
+                connector.disconnect();
+            }
+        }
+    }
+
+    private void holdGetLock() {
+        getLock.lock();
+    }
+
+    private void releaseGetLock() {
+        getLock.unlock();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java
new file mode 100644
index 0000000..1cb3c77
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.analysis.BinlogDesc;
+import org.apache.doris.analysis.ChannelDescription;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.sync.DataSyncJobType;
+import org.apache.doris.load.sync.SyncFailMsg;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+public class CanalSyncJob extends SyncJob {
+    private static final Logger LOG = LogManager.getLogger(CanalSyncJob.class);
+
+    protected final static String CANAL_SERVER_IP = "canal.server.ip";
+    protected final static String CANAL_SERVER_PORT = "canal.server.port";
+    protected final static String CANAL_DESTINATION = "canal.destination";
+    protected final static String CANAL_USERNAME = "canal.username";
+    protected final static String CANAL_PASSWORD = "canal.password";
+    protected final static String CANAL_BATCH_SIZE = "canal.batchSize";
+    protected final static String CANAL_DEBUG = "canal.debug";
+
+    @SerializedName(value = "ip")
+    private String ip;
+    @SerializedName(value = "port")
+    private int port;
+    @SerializedName(value = "destination")
+    private String destination;
+    @SerializedName(value = "username")
+    private String username;
+    @SerializedName(value = "password")
+    private String password;
+    @SerializedName(value = "batchSize")
+    private int batchSize = 8192;
+    @SerializedName(value = "debug")
+    private boolean debug = false;
+
+    private transient SyncCanalClient client;
+
+    public CanalSyncJob(long id, String jobName, long dbId) {
+        super(id, jobName, dbId);
+        this.dataSyncJobType = DataSyncJobType.CANAL;
+    }
+
+    private void init() throws UserException {
+        CanalConnector connector = CanalConnectors.newSingleConnector(
+                new InetSocketAddress(ip, port), destination, username, password);
+        client = new SyncCanalClient(this, destination, connector, batchSize, debug);
+        // create channels
+        initChannels();
+        // register channels into client
+        client.registerChannels(channels);
+    }
+
+    public void initChannels() throws DdlException {
+        if (channels == null) {
+            channels = Lists.newArrayList();
+        }
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        if (db == null) {
+            throw new DdlException("Database[" + dbId + "] does not exist");
+        }
+        db.writeLock();
+        try {
+            for (ChannelDescription channelDescription : channelDescriptions) {
+                String tableName = channelDescription.getTargetTable();
+                Table table = db.getTable(tableName);
+                if (!(table instanceof OlapTable)) {
+                    throw new DdlException("Table[" + tableName + "] is invalid.");
+                }
+                if (((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS || !((OlapTable) table).hasDeleteSign()) {
+                    throw new DdlException("Table[" + tableName + "] don't support batch delete.");
+                }
+                List<String> colNames = channelDescription.getColNames();
+                if (colNames == null) {
+                    colNames = Lists.newArrayList();
+                    for (Column column : table.getBaseSchema(false)) {
+                        colNames.add(column.getName());
+                    }
+                }
+                CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, (OlapTable) table, colNames,
+                        channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
+                if (channelDescription.getPartitionNames() != null) {
+                    syncChannel.setPartitions(channelDescription.getPartitionNames());
+                }
+                channels.add(syncChannel);
+            }
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
+    @Override
+    public void checkAndSetBinlogInfo(BinlogDesc binlogDesc) throws DdlException {
+        super.checkAndSetBinlogInfo(binlogDesc);
+        Map<String, String> properties = binlogDesc.getProperties();
+
+        // required binlog properties
+        if (!properties.containsKey(CANAL_SERVER_IP)) {
+            throw new DdlException("Missing " + CANAL_SERVER_IP + " property in binlog properties");
+        } else {
+            ip = properties.get(CANAL_SERVER_IP);
+        }
+
+        if (!properties.containsKey(CANAL_SERVER_PORT)) {
+            throw new DdlException("Missing " + CANAL_SERVER_PORT + " property in binlog properties");
+        } else {
+            try {
+                port = Integer.parseInt(properties.get(CANAL_SERVER_PORT));
+            } catch (NumberFormatException e) {
+                throw new DdlException("canal port is not int");
+            }
+        }
+
+        if (!properties.containsKey(CANAL_DESTINATION)) {
+            throw new DdlException("Missing " + CANAL_DESTINATION + " property in binlog properties");
+        } else {
+            destination = properties.get(CANAL_DESTINATION);
+        }
+
+        if (!properties.containsKey(CANAL_USERNAME)) {
+            throw new DdlException("Missing " + CANAL_USERNAME + " property in binlog properties");
+        } else {
+            username = properties.get(CANAL_USERNAME);
+        }
+
+        if (!properties.containsKey(CANAL_PASSWORD)) {
+            throw new DdlException("Missing " + CANAL_PASSWORD + " property in binlog properties");
+        } else {
+            password = properties.get(CANAL_PASSWORD);
+        }
+
+        // optional binlog properties
+        if (properties.containsKey(CANAL_BATCH_SIZE)) {
+            try {
+                batchSize = Integer.parseInt(properties.get(CANAL_BATCH_SIZE));
+            } catch (NumberFormatException e) {
+                throw new DdlException("Property " + CANAL_BATCH_SIZE + " is not int");
+            }
+        }
+
+        if (properties.containsKey(CANAL_DEBUG)) {
+            debug = Boolean.parseBoolean(properties.get(CANAL_DEBUG));
+        }
+    }
+
+    public boolean isInit() {
+        return client != null && channels != null;
+    }
+
+    @Override
+    public void execute() throws UserException {
+        LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug);
+        // init
+        init();
+        // start client
+        unprotectedStartClient();
+    }
+
+    @Override
+    public void cancel(MsgType msgType, String errMsg) {
+        LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg);
+        failMsg = new SyncFailMsg(msgType, errMsg);
+        switch (msgType) {
+            case USER_CANCEL:
+            case SUBMIT_FAIL:
+            case RUN_FAIL:
+            case UNKNOWN:
+                unprotectedStopClient(JobState.CANCELLED);
+                break;
+            default:
+                Preconditions.checkState(false, "unknown msg type: " + msgType.name());
+                break;
+        }
+    }
+
+    @Override
+    public void pause() {
+        LOG.info("Pause canal sync job {}. Client remote ip: {}, remote port: {}, debug: {}", id, ip, port, debug);
+        unprotectedStopClient(JobState.PAUSED);
+    }
+
+    @Override
+    public void resume() {
+        LOG.info("Resume canal sync job {}. Client remote ip: {}, remote port: {}, debug: {}", id, ip, port, debug);
+        unprotectedStartClient();
+    }
+
+    public void unprotectedStartClient() {
+        client.startup();
+        updateState(JobState.RUNNING, false);
+        LOG.info("client has been started. id: {}, jobName: {}", id, jobName);
+    }
+
+    public void unprotectedStopClient(JobState jobState) {
+        if (jobState != JobState.CANCELLED && jobState != JobState.PAUSED) {
+            return;
+        }
+        if (client != null) {
+            if (jobState == JobState.CANCELLED) {
+                client.shutdown(true);
+            } else {
+                client.shutdown(false);
+            }
+        }
+        updateState(jobState, false);
+        LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName);
+    }
+
+    @Override
+    public void replayUpdateSyncJobState(SyncJobUpdateStateInfo info) {
+        lastStartTimeMs = info.getLastStartTimeMs();
+        lastStopTimeMs = info.getLastStopTimeMs();
+        finishTimeMs = info.getFinishTimeMs();
+        failMsg = info.getFailMsg();
+        try {
+            if (!isInit()) {
+                init();
+            }
+            JobState jobState = info.getJobState();
+            switch (jobState) {
+                case RUNNING:
+                    client.startup();
+                    updateState(JobState.RUNNING, true);
+                    break;
+                case PAUSED:
+                    client.shutdown(false);
+                    updateState(JobState.PAUSED, true);
+                    break;
+                case CANCELLED:
+                    client.shutdown(true);
+                    updateState(JobState.CANCELLED, true);
+                    break;
+            }
+        } catch (UserException e) {
+            LOG.warn("encounter an error when replay update sync job state. id: {}, newState: {}, reason: {}",
+                    info.getId(), info.getJobState(), e.getMessage());
+            cancel(MsgType.UNKNOWN, e.getMessage());
+        }
+        LOG.info("replay update sync job state: {}", info);
+    }
+
+    @Override
+    public String getStatus() {
+        if (client != null) {
+            return client.getPositionInfo();
+        }
+        return "\\N";
+    }
+
+    @Override
+    public String getJobConfig() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("adress:").append(ip).append(":").append(port).append(",")
+                .append("destination:").append(destination).append(",")
+                .append("batchSize:").append(batchSize);
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return "SyncJob [jobId=" + id
+                + ", jobName=" +jobName
+                + ", dbId=" + dbId
+                + ", state=" + jobState
+                + ", createTimeMs=" + TimeUtils.longToTimeString(createTimeMs)
+                + ", lastStartTimeMs=" + TimeUtils.longToTimeString(lastStartTimeMs)
+                + ", lastStopTimeMs=" + TimeUtils.longToTimeString(lastStopTimeMs)
+                + ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs)
+                + "]";
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalUtils.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalUtils.java
new file mode 100644
index 0000000..86c5124
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalUtils.java
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.load.sync.model.Events;
+import org.apache.doris.load.sync.position.EntryPosition;
+
+import com.alibaba.otter.canal.common.CanalException;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+public class CanalUtils {
+    private static Logger logger = LogManager.getLogger(CanalUtils.class);
+
+    private static final String SEP = SystemUtils.LINE_SEPARATOR;
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    private static String context_format     = null;
+    private static String row_format         = null;
+    private static String transaction_format = null;
+
+    static {
+        context_format = SEP + "----------- Batch Summary ------------------------------>" + SEP;
+        context_format += "| Batch Id: [{}] ,count : [{}] , Mem size : [{}] , Time : {}" + SEP;
+        context_format += "| Start : [{}] " + SEP;
+        context_format += "| End : [{}] " + SEP;
+        context_format += "----------------------------------------------------------" + SEP;
+        row_format = SEP
+                + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
+                + SEP;
+        transaction_format = SEP
+                + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
+                + SEP;
+    }
+
+    public static void printSummary(Events<CanalEntry.Entry, EntryPosition> dataEvents) {
+        List<CanalEntry.Entry> entries = dataEvents.getDatas();
+        if (CollectionUtils.isEmpty(entries)) {
+            return;
+        }
+        String startPosition = buildPositionForDump(entries.get(0));
+        String endPosition = buildPositionForDump(entries.get(entries.size() - 1));
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        logger.info(context_format, dataEvents.getId(), entries.size(), dataEvents.getMemSize(), format.format(new Date()), startPosition, endPosition);
+    }
+
+    public static void printSummary(Message message, int size, long memsize) {
+        List<CanalEntry.Entry> entries = message.getEntries();
+        if (CollectionUtils.isEmpty(entries)) {
+            return;
+        }
+        String startPosition = buildPositionForDump(message.getEntries().get(0));
+        String endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        logger.info(context_format, message.getId(), size, memsize, format.format(new Date()), startPosition, endPosition);
+    }
+
+    public static String buildPositionForDump(CanalEntry.Entry entry) {
+        CanalEntry.Header header = entry.getHeader();
+        long time = entry.getHeader().getExecuteTime();
+        Date date = new Date(time);
+        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
+        StringBuilder sb = new StringBuilder();
+        sb.append(header.getLogfileName())
+                .append(":")
+                .append(header.getLogfileOffset())
+                .append(":")
+                .append(header.getExecuteTime())
+                .append("(")
+                .append(format.format(date))
+                .append(")");
+        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
+            sb.append(" gtid(").append(entry.getHeader().getGtid())
+                    .append(")");
+        }
+        return sb.toString();
+    }
+
+    public static String getFullName(String schemaName, String tableName) {
+        StringBuilder sb = new StringBuilder();
+        if (schemaName != null) {
+            sb.append(schemaName).append(".");
+        }
+        sb.append(tableName);
+        return sb.toString().intern();
+    }
+
+    public static void printRow(CanalEntry.RowChange rowChange, CanalEntry.Header header) {
+        long executeTime = header.getExecuteTime();
+        long delayTime = new Date().getTime() - executeTime;
+        Date date = new Date(executeTime);
+        CanalEntry.EventType eventType = rowChange.getEventType();
+        logger.info(row_format, header.getLogfileName(),
+                String.valueOf(header.getLogfileOffset()), header.getSchemaName(),
+                header.getTableName(), eventType,
+                String.valueOf(header.getExecuteTime()), simpleDateFormat.format(date),
+                header.getGtid(), String.valueOf(delayTime));
+        if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
+            logger.info(" sql ----> " + rowChange.getSql() + SEP);
+            return;
+        }
+        printXAInfo(rowChange.getPropsList());
+        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+            if (eventType == CanalEntry.EventType.DELETE) {
+                printColumn(rowData.getBeforeColumnsList());
+            } else if (eventType == CanalEntry.EventType.INSERT) {
+                printColumn(rowData.getAfterColumnsList());
+            } else {
+                printColumn(rowData.getAfterColumnsList());
+            }
+        }
+    }
+
+    public static void printColumn(List<CanalEntry.Column> columns) {
+        StringBuilder builder = new StringBuilder();
+        for (CanalEntry.Column column : columns) {
+            try {
+                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
+                        || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
+                    // get value bytes
+                    builder.append(column.getName())
+                            .append(" : ")
+                            .append(new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
+                } else {
+                    builder.append(column.getName())
+                            .append(" : ")
+                            .append(column.getValue());
+                }
+            } catch (UnsupportedEncodingException e) {
+            }
+            builder.append("    type=").append(column.getMysqlType());
+            if (column.getUpdated()) {
+                builder.append("    update=").append(column.getUpdated());
+            }
+            builder.append(SEP);
+        }
+        logger.info(builder.toString());
+    }
+
+    public static void printXAInfo(List<CanalEntry.Pair> pairs) {
+        if (pairs == null) {
+            return;
+        }
+        String xaType = null;
+        String xaXid = null;
+        for (CanalEntry.Pair pair : pairs) {
+            String key = pair.getKey();
+            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
+                xaType = pair.getValue();
+            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
+                xaXid = pair.getValue();
+            }
+        }
+        if (xaType != null && xaXid != null) {
+            logger.info(" ------> " + xaType + " " + xaXid);
+        }
+    }
+
+    public static void transactionBegin(CanalEntry.Entry entry) {
+        long executeTime = entry.getHeader().getExecuteTime();
+        long delayTime = new Date().getTime() - executeTime;
+        Date date = new Date(executeTime);
+        CanalEntry.TransactionBegin begin = null;
+        try {
+            begin = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
+        } catch (InvalidProtocolBufferException e) {
+            throw new CanalException("parse event has an error , data:" + entry.toString(), e);
+        }
+        // print transaction begin info, thread ID, time consumption
+        logger.info(transaction_format,entry.getHeader().getLogfileName(),
+                String.valueOf(entry.getHeader().getLogfileOffset()),
+                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                entry.getHeader().getGtid(), String.valueOf(delayTime));
+        logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
+        printXAInfo(begin.getPropsList());
+    }
+
+    public static void transactionEnd(CanalEntry.Entry entry) {
+        long executeTime = entry.getHeader().getExecuteTime();
+        long delayTime = new Date().getTime() - executeTime;
+        Date date = new Date(executeTime);
+        CanalEntry.TransactionEnd end = null;
+        try {
+            end = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
+        } catch (InvalidProtocolBufferException e) {
+            throw new CanalException("parse event has an error , data:" + entry.toString(), e);
+        }
+        // print transaction end info, transaction ID
+        logger.info("----------------\n");
+        logger.info(" END ----> transaction id: {}", end.getTransactionId());
+        printXAInfo(end.getPropsList());
+        logger.info(transaction_format, entry.getHeader().getLogfileName(),
+                String.valueOf(entry.getHeader().getLogfileOffset()),
+                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
+                entry.getHeader().getGtid(), String.valueOf(delayTime));
+    }
+
+    public static boolean isDML(CanalEntry.EventType eventType) {
+        switch (eventType) {
+            case INSERT:
+            case UPDATE:
+            case DELETE:
+                return true;
+            default:
+                return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java
new file mode 100644
index 0000000..33cb8cf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.load.sync.SyncChannel;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SyncCanalClient {
+    protected static Logger logger = LogManager.getLogger(SyncCanalClient.class);
+
+    private CanalConnector connector;
+
+    private CanalSyncDataReceiver receiver;
+    private CanalSyncDataConsumer consumer;
+
+    // channel id -> channel
+    private Map<Long, CanalSyncChannel> idToChannels;
+
+    protected ReentrantLock lock = new ReentrantLock(true);
+    protected ReentrantLock getLock = new ReentrantLock();
+
+    protected void lock() {
+        lock.lock();
+    }
+
+    protected void unlock() {
+        lock.unlock();
+    }
+
+    private ShutDownWorker shutDownWorker;
+
+    public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) {
+        this(syncJob, destination, connector, batchSize, debug, ".*\\..*");
+    }
+
+    public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug, String filter) {
+        this.connector = connector;
+        this.consumer = new CanalSyncDataConsumer(syncJob, connector, getLock, debug);
+        this.receiver = new CanalSyncDataReceiver(syncJob, connector, destination, filter, consumer, batchSize, getLock);
+        this.idToChannels = Maps.newHashMap();
+    }
+
+    public void startup() {
+        Preconditions.checkNotNull(connector, "connector is null");
+        Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered");
+        lock();
+        try {
+            // 1.start all threads in channel
+            for (CanalSyncChannel channel : idToChannels.values()) {
+                channel.start();
+            }
+            // 2. start executor
+            consumer.start();
+            // 3. start receiver
+            receiver.start();
+        } finally {
+            unlock();
+        }
+        logger.info("canal client has been started.");
+    }
+
+    // Stop client asynchronously
+    public void shutdown(boolean needCleanUp) {
+        this.shutDownWorker = new ShutDownWorker(needCleanUp);
+        shutDownWorker.shutdown();
+        logger.info("canal client shutdown worker has been started.");
+    }
+
+    public class ShutDownWorker implements Runnable {
+        public Thread thread;
+        public boolean needCleanUp;
+
+        public ShutDownWorker(boolean needCleanUp) {
+            this.thread = new Thread(this, "ShutDownWorker");
+            this.needCleanUp = needCleanUp;
+        }
+
+        public void shutdown() {
+            thread.start();
+        }
+
+        @Override
+        public void run() {
+            lock();
+            try {
+                // 1. stop receiver
+                receiver.stop();
+                // 2. stop executor
+                consumer.stop(needCleanUp);
+                // 3. stop channels
+                for (CanalSyncChannel channel : idToChannels.values()) {
+                    channel.stop();
+                }
+            } finally {
+                unlock();
+            }
+            logger.info("canal client has been stopped.");
+        }
+    }
+
+    public void registerChannels(List<SyncChannel> channels) {
+        StringBuilder channelFilters = new StringBuilder();
+        for (int i = 0; i < channels.size(); i++) {
+            CanalSyncChannel channel = (CanalSyncChannel) channels.get(i);
+            String filter = channel.getSrcDataBase() + "." + channel.getSrcTable();
+            String targetTable = channel.getTargetTable();
+            channelFilters.append(filter);
+            if (i < channels.size() - 1) {
+                channelFilters.append(",");
+            }
+            idToChannels.put(channel.getId(), channel);
+            logger.info("register channel, filter: {}, target table: {}", filter, targetTable);
+        }
+        receiver.setFilter(channelFilters.toString());
+        consumer.setChannels(idToChannels);
+    }
+
+    public String getPositionInfo() {
+        return consumer.getPositionInfo();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Data.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Data.java
new file mode 100644
index 0000000..6aff63e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Data.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.model;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+// Equivalent to a batch send to be
+// T = dataType
+public class Data<T> {
+    private List<T> datas;
+
+    public Data() {
+        this(Lists.newArrayList());
+    }
+
+    public Data(List<T> datas) {
+        this.datas = datas;
+    }
+
+    public List<T> getDatas() {
+        return datas;
+    }
+
+    public void addRow(T row) {
+        this.datas.add(row);
+    }
+
+    public boolean isNotEmpty() {
+        return datas.size() > 0;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Events.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Events.java
new file mode 100644
index 0000000..6b51e8c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/model/Events.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.model;
+
+import org.apache.doris.load.sync.position.PositionRange;
+
+import java.util.List;
+
+// Equivalent to a batch get from server
+// T = dataType
+// P = positionType
+public class Events<T, P> {
+    private long id;
+    private List<T> datas;
+    private PositionRange<P> positionRange;
+    private long memSize;
+
+    public Events(Long id) {
+        this(id, null);
+    }
+
+    public Events(Long id, List<T> datas) {
+        this.id = id;
+        this.datas = datas;
+    }
+
+    public Long getId() {
+        return id;
+    }
+
+    public void setId(Long id) {
+        this.id = id;
+    }
+
+    public List<T> getDatas() {
+        return datas;
+    }
+
+    public void setDatas(List<T> datas) {
+        this.datas = datas;
+    }
+
+    public PositionRange<P> getPositionRange() {
+        return positionRange;
+    }
+
+    public void setPositionRange(PositionRange<P> positionRange) {
+        this.positionRange = positionRange;
+    }
+
+    public void setMemSize(Long memSize) {
+        this.memSize = memSize;
+    }
+
+    public long getMemSize() {
+        return this.memSize;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java
new file mode 100644
index 0000000..23deb5e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/EntryPosition.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+
+import com.google.common.base.Strings;
+
+public class EntryPosition {
+    private String journalName;
+    private Long position;
+    private String gtid;
+    private Long executeTime;
+
+    public static final EntryPosition MIN_POS = new EntryPosition("", -1L, null);
+
+    public EntryPosition() {
+        this(null, (Long)null, (Long)null);
+    }
+
+    public EntryPosition(String journalName, Long position, Long timestamp) {
+        this.gtid = null;
+        this.journalName = journalName;
+        this.position = position;
+        this.executeTime = timestamp;
+    }
+
+    public EntryPosition(String journalName, Long position) {
+        this(journalName, position, (Long)null);
+    }
+
+    public String getJournalName() {
+        return this.journalName;
+    }
+
+    public void setJournalName(String journalName) {
+        this.journalName = journalName;
+    }
+
+    public Long getPosition() {
+        return this.position;
+    }
+
+    public void setPosition(Long position) {
+        this.position = position;
+    }
+
+    public Long getExecuteTime() {
+        return this.executeTime;
+    }
+
+    public void setExecuteTime(Long timeStamp) {
+        this.executeTime = timeStamp;
+    }
+
+    public String getGtid() {
+        return this.gtid;
+    }
+
+    public void setGtid(String gtid) {
+        this.gtid = gtid;
+    }
+
+    public int hashCode() {
+        int result = 1;
+        result = 31 * result + (this.journalName == null ? 0 : this.journalName.hashCode());
+        result = 31 * result + (this.position == null ? 0 : this.position.hashCode());
+        result = 31 * result + (this.executeTime == null ? 0 : this.executeTime.hashCode());
+        return result;
+    }
+
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }  else if (!(obj instanceof EntryPosition)) {
+            return false;
+        } else {
+            EntryPosition other = (EntryPosition) obj;
+            if (this.journalName == null) {
+                if (other.journalName != null) {
+                    return false;
+                }
+            } else if (!this.journalName.equals(other.journalName)) {
+                return false;
+            }
+
+            if (this.position == null) {
+                if (other.position != null) {
+                    return false;
+                }
+            } else if (!this.position.equals(other.position)) {
+                return false;
+            }
+
+            if (this.executeTime == null) {
+                if (other.executeTime != null) {
+                    return false;
+                }
+            } else if (!this.executeTime.equals(other.executeTime)) {
+                return false;
+            }
+
+            return true;
+        }
+    }
+    @Override
+    public String toString() {
+        return "[" + journalName + ":" + position + "]";
+    }
+
+    public int compareTo(EntryPosition o) {
+        final int val = journalName.compareTo(o.journalName);
+
+        if (val == 0) {
+            return (int) (position - o.position);
+        }
+        return val;
+    }
+
+    public static EntryPosition min(EntryPosition position1, EntryPosition position2) {
+        if (position1.getJournalName().compareTo(position2.getJournalName()) > 0) {
+            return position2;
+        } else if (position1.getJournalName().compareTo(position2.getJournalName()) < 0) {
+            return position1;
+        } else {
+            if (position1.getPosition() > position2.getPosition()) {
+                return position2;
+            } else {
+                return position1;
+            }
+        }
+    }
+
+    // --------helper methods---------
+
+    public static EntryPosition createPosition(CanalEntry.Entry entry) {
+        final CanalEntry.Header header = entry.getHeader();
+        EntryPosition position = new EntryPosition();
+        position.setJournalName(header.getLogfileName());
+        position.setPosition(header.getLogfileOffset());
+        position.setExecuteTime(header.getExecuteTime());
+        position.setGtid(header.getGtid());
+        return position;
+    }
+
+    public static boolean checkPosition(CanalEntry.Entry entry, EntryPosition entryPosition) {
+        return checkPosition(entry.getHeader(), entryPosition);
+    }
+
+    public static boolean checkPosition(CanalEntry.Header header, EntryPosition entryPosition) {
+        boolean result = entryPosition.getExecuteTime().equals(header.getExecuteTime());
+        boolean isEmptyPosition = (Strings.isNullOrEmpty(entryPosition.getJournalName()) && entryPosition.getPosition() == null);
+        if (!isEmptyPosition) {
+            result &= entryPosition.getPosition().equals(header.getLogfileOffset());
+            if (result) {
+                result &= header.getLogfileName().equals(entryPosition.getJournalName());
+            }
+        }
+        return result;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
new file mode 100644
index 0000000..4d68315
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class PositionMeta<T> {
+    // max batch id
+    private long maxBatchId;
+    // batch Id -> position range
+    private Map<Long, PositionRange<T>> batches;
+    // channel Id -> commit position
+    private Map<Long, T> commitPositions;
+    // ack position
+    private T ackPosition;
+    // ack time
+    private long ackTime;
+
+    public PositionMeta() {
+        this.maxBatchId = -1L;
+        this.batches = Maps.newHashMap();
+        this.commitPositions = Maps.newHashMap();
+    }
+    public void addBatch(long batchId, PositionRange<T> range) {
+        updateMaxBatchId(batchId);
+        batches.put(batchId, range);
+    }
+
+    public PositionRange<T> removeBatch(long batchId) {
+        return batches.remove(batchId);
+    }
+
+    public void clearAllBatch() {
+        batches.clear();
+    }
+
+    public void setCommitPosition(long channelId, T position) {
+        commitPositions.put(channelId, position);
+    }
+
+    public T getCommitPosition(long channelId) {
+        return commitPositions.get(channelId);
+    }
+
+    public void setAckPosition(T ackPosition) {
+        this.ackPosition = ackPosition;
+    }
+
+    public T getAckPosition() {
+        return this.ackPosition;
+    }
+
+    public void setAckTime(long ackTime) {
+        this.ackTime = ackTime;
+    }
+
+    public long getAckTime() {
+        return this.ackTime;
+    }
+
+    public T getLatestPosition() {
+        if (batches.isEmpty()) {
+            return null;
+        } else {
+            return batches.get(maxBatchId).getEnd();
+        }
+    }
+
+    private void updateMaxBatchId(long batchId) {
+        if (maxBatchId < batchId) {
+            maxBatchId = batchId;
+        }
+    }
+
+    public void cleanUp() {
+        this.maxBatchId = -1L;
+        this.batches.clear();
+        this.commitPositions.clear();
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java
new file mode 100644
index 0000000..a33fe41
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionRange.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.position;
+
+public class PositionRange<T> {
+    private T start;
+    private T end;
+
+    public PositionRange() {
+    }
+
+    public PositionRange(T start, T end) {
+        this.start = start;
+        this.end = end;
+    }
+
+    public T getStart() {
+        return start;
+    }
+
+    public void setStart(T start) {
+        this.start = start;
+    }
+
+    public T getEnd() {
+        return end;
+    }
+
+    public void setEnd(T end) {
+        this.end = end;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((end == null) ? 0 : end.hashCode());
+        result = prime * result + ((start == null) ? 0 : start.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof PositionRange)) {
+            return false;
+        }
+
+        PositionRange other = (PositionRange) obj;
+        if (end == null) {
+            if (other.end != null) {
+                return false;
+            }
+        } else if (!end.equals(other.end)) {
+            return false;
+        }
+
+        if (start == null) {
+            if (other.start != null) {
+                return false;
+            }
+        } else if (!start.equals(other.start)) {
+            return false;
+        }
+
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 728e89e..7fbcd47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -29,11 +29,11 @@ import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.EncryptKey;
 import org.apache.doris.catalog.EncryptKeyHelper;
+import org.apache.doris.catalog.EncryptKeySearchDesc;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.FunctionSearchDesc;
-import org.apache.doris.catalog.EncryptKey;
-import org.apache.doris.catalog.EncryptKeySearchDesc;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
@@ -59,6 +59,7 @@ import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -715,6 +716,16 @@ public class EditLog {
                     catalog.getLoadManager().replayUpdateLoadJobStateInfo(info);
                     break;
                 }
+                case OperationType.OP_CREATE_SYNC_JOB: {
+                    SyncJob syncJob = (SyncJob) journal.getData();
+                    catalog.getSyncJobManager().replayAddSyncJob(syncJob);
+                    break;
+                }
+                case OperationType.OP_UPDATE_SYNC_JOB_STATE: {
+                    SyncJob.SyncJobUpdateStateInfo info = (SyncJob.SyncJobUpdateStateInfo) journal.getData();
+                    catalog.getSyncJobManager().replayUpdateSyncJobState(info);
+                    break;
+                }
                 case OperationType.OP_FETCH_STREAM_LOAD_RECORD: {
                     FetchStreamLoadRecord fetchStreamLoadRecord = (FetchStreamLoadRecord) journal.getData();
                     catalog.getStreamLoadRecordMgr().replayFetchStreamLoadRecord(fetchStreamLoadRecord);
@@ -1335,6 +1346,14 @@ public class EditLog {
         logEdit(OperationType.OP_UPDATE_LOAD_JOB, info);
     }
 
+    public void logCreateSyncJob(SyncJob syncJob) {
+        logEdit(OperationType.OP_CREATE_SYNC_JOB, syncJob);
+    }
+
+    public void logUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo info) {
+        logEdit(OperationType.OP_UPDATE_SYNC_JOB_STATE, info);
+    }
+
     public void logFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) {
         logEdit(OperationType.OP_FETCH_STREAM_LOAD_RECORD, fetchStreamLoadRecord);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index f591e5b..9ac9aa5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -172,6 +172,10 @@ public class OperationType {
     public static final short OP_UPDATE_LOAD_JOB = 232;
     // fetch stream load record
     public static final short OP_FETCH_STREAM_LOAD_RECORD = 233;
+    // create sync job
+    public static final short OP_CREATE_SYNC_JOB = 234;
+    // update sync job state
+    public static final short OP_UPDATE_SYNC_JOB_STATE = 235;
 
     // small files 251~260
     public static final short OP_CREATE_SMALL_FILE = 251;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 56e8ac8..300f25f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -32,7 +32,8 @@ import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
-
+import org.apache.doris.load.sync.SyncJob;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
@@ -42,16 +43,6 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Table;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import com.google.gson.ExclusionStrategy;
 import com.google.gson.FieldAttributes;
 import com.google.gson.Gson;
@@ -70,7 +61,17 @@ import com.google.gson.annotations.SerializedName;
 import com.google.gson.reflect.TypeToken;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
-import  org.apache.commons.lang3.reflect.TypeUtils;
+
+import org.apache.commons.lang3.reflect.TypeUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /*
  * Some utilities about Gson.
@@ -116,6 +117,11 @@ public class GsonUtils {
             .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName())
             .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName());
 
+    // runtime adapter for class "SyncJob"
+    private static RuntimeTypeAdapterFactory<SyncJob> syncJobTypeAdapterFactory = RuntimeTypeAdapterFactory
+            .of(SyncJob.class, "clazz")
+            .registerSubtype(CanalSyncJob.class, CanalSyncJob.class.getSimpleName());
+
     // runtime adapter for class "LoadJobStateUpdateInfo"
     private static RuntimeTypeAdapterFactory<LoadJobStateUpdateInfo> loadJobStateUpdateInfoTypeAdapterFactory
             = RuntimeTypeAdapterFactory
@@ -134,6 +140,7 @@ public class GsonUtils {
             .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
             .registerTypeAdapterFactory(resourceTypeAdapterFactory)
             .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
+            .registerTypeAdapterFactory(syncJobTypeAdapterFactory)
             .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
             .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
             .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 1045caf..f9e76ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -35,10 +35,11 @@ import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CreateClusterStmt;
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
 import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateEncryptKeyStmt;
 import org.apache.doris.analysis.CreateFileStmt;
 import org.apache.doris.analysis.CreateFunctionStmt;
-import org.apache.doris.analysis.CreateEncryptKeyStmt;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.CreateRepositoryStmt;
 import org.apache.doris.analysis.CreateResourceStmt;
@@ -52,9 +53,9 @@ import org.apache.doris.analysis.DdlStmt;
 import org.apache.doris.analysis.DeleteStmt;
 import org.apache.doris.analysis.DropClusterStmt;
 import org.apache.doris.analysis.DropDbStmt;
+import org.apache.doris.analysis.DropEncryptKeyStmt;
 import org.apache.doris.analysis.DropFileStmt;
 import org.apache.doris.analysis.DropFunctionStmt;
-import org.apache.doris.analysis.DropEncryptKeyStmt;
 import org.apache.doris.analysis.DropMaterializedViewStmt;
 import org.apache.doris.analysis.DropRepositoryStmt;
 import org.apache.doris.analysis.DropResourceStmt;
@@ -67,14 +68,17 @@ import org.apache.doris.analysis.LinkDbStmt;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.MigrateDbStmt;
 import org.apache.doris.analysis.PauseRoutineLoadStmt;
+import org.apache.doris.analysis.PauseSyncJobStmt;
 import org.apache.doris.analysis.RecoverDbStmt;
 import org.apache.doris.analysis.RecoverPartitionStmt;
 import org.apache.doris.analysis.RecoverTableStmt;
 import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.ResumeRoutineLoadStmt;
+import org.apache.doris.analysis.ResumeSyncJobStmt;
 import org.apache.doris.analysis.RevokeStmt;
 import org.apache.doris.analysis.SetUserPropertyStmt;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
+import org.apache.doris.analysis.StopSyncJobStmt;
 import org.apache.doris.analysis.SyncStmt;
 import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.UninstallPluginStmt;
@@ -84,6 +88,7 @@ import org.apache.doris.catalog.EncryptKeyHelper;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.sync.SyncJobManager;
 
 public class DdlExecutor {
     public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
@@ -240,6 +245,21 @@ public class DdlExecutor {
             catalog.getResourceMgr().createResource((CreateResourceStmt) ddlStmt);
         } else if (ddlStmt instanceof DropResourceStmt) {
             catalog.getResourceMgr().dropResource((DropResourceStmt) ddlStmt);
+        } else if (ddlStmt instanceof CreateDataSyncJobStmt) {
+            CreateDataSyncJobStmt createSyncJobStmt = (CreateDataSyncJobStmt) ddlStmt;
+            SyncJobManager syncJobMgr = catalog.getSyncJobManager();
+            if (!syncJobMgr.isJobNameExist(createSyncJobStmt.getDbName(), createSyncJobStmt.getJobName())) {
+                syncJobMgr.addDataSyncJob((CreateDataSyncJobStmt) ddlStmt);
+            } else {
+                throw new DdlException("The syncJob with jobName '" + createSyncJobStmt.getJobName() +
+                        "' in database [" + createSyncJobStmt.getDbName() + "] is already exists.");
+            }
+        } else if (ddlStmt instanceof ResumeSyncJobStmt) {
+            catalog.getSyncJobManager().resumeSyncJob((ResumeSyncJobStmt) ddlStmt);
+        } else if (ddlStmt instanceof PauseSyncJobStmt) {
+            catalog.getSyncJobManager().pauseSyncJob((PauseSyncJobStmt) ddlStmt);
+        } else if (ddlStmt instanceof StopSyncJobStmt) {
+            catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
         } else {
             throw new DdlException("Unknown statement.");
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index ed49e7c..54ca69f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -41,6 +41,7 @@ import org.apache.doris.analysis.ShowDbIdStmt;
 import org.apache.doris.analysis.ShowDbStmt;
 import org.apache.doris.analysis.ShowDeleteStmt;
 import org.apache.doris.analysis.ShowDynamicPartitionStmt;
+import org.apache.doris.analysis.ShowEncryptKeysStmt;
 import org.apache.doris.analysis.ShowEnginesStmt;
 import org.apache.doris.analysis.ShowExportStmt;
 import org.apache.doris.analysis.ShowFrontendsStmt;
@@ -69,6 +70,7 @@ import org.apache.doris.analysis.ShowSmallFilesStmt;
 import org.apache.doris.analysis.ShowSnapshotStmt;
 import org.apache.doris.analysis.ShowStmt;
 import org.apache.doris.analysis.ShowStreamLoadStmt;
+import org.apache.doris.analysis.ShowSyncJobStmt;
 import org.apache.doris.analysis.ShowTableIdStmt;
 import org.apache.doris.analysis.ShowTableStatusStmt;
 import org.apache.doris.analysis.ShowTableStmt;
@@ -86,9 +88,9 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DynamicPartitionProperty;
+import org.apache.doris.catalog.EncryptKey;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.Index;
-import org.apache.doris.catalog.EncryptKey;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.MetadataViewer;
@@ -144,7 +146,6 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.GlobalTransactionMgr;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -302,6 +303,8 @@ public class ShowExecutor {
             handleShowLoadProfile();
         } else if (stmt instanceof AdminShowDataSkewStmt) {
             handleAdminShowDataSkew();
+        } else if (stmt instanceof ShowSyncJobStmt) {
+            handleShowSyncJobs();
         } else {
             handleEmtpy();
         }
@@ -1729,6 +1732,29 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
     }
 
+    private void handleShowSyncJobs() throws AnalysisException {
+        ShowSyncJobStmt showStmt = (ShowSyncJobStmt) stmt;
+        Catalog catalog = Catalog.getCurrentCatalog();
+        Database db = catalog.getDb(showStmt.getDbName());
+        if (db == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
+        }
+
+        List<List<Comparable>> syncInfos = catalog.getSyncJobManager().getSyncJobsInfoByDbId(db.getId());
+        Collections.sort(syncInfos, new ListComparator<List<Comparable>>(0));
+
+        List<List<String>> rows = Lists.newArrayList();
+        for (List<Comparable> syncInfo : syncInfos) {
+            List<String> row = new ArrayList<String>(syncInfo.size());
+
+            for (Comparable element : syncInfo) {
+                row.add(element.toString());
+            }
+            rows.add(row);
+        }
+        resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+    }
+
     private void handleShowGrants() {
         ShowGrantsStmt showStmt = (ShowGrantsStmt) stmt;
         List<List<String>> infos = Catalog.getCurrentCatalog().getAuth().getAuthInfo(showStmt.getUserIdent());
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 9a5bcc5..f419622 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -108,6 +108,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("begin", new Integer(SqlParserSymbols.KW_BEGIN));
         keywordMap.put("between", new Integer(SqlParserSymbols.KW_BETWEEN));
         keywordMap.put("bigint", new Integer(SqlParserSymbols.KW_BIGINT));
+        keywordMap.put("binlog", new Integer(SqlParserSymbols.KW_BINLOG));
         keywordMap.put("bitmap", new Integer(SqlParserSymbols.KW_BITMAP));
         keywordMap.put("bitmap_union", new Integer(SqlParserSymbols.KW_BITMAP_UNION));
         keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN));
@@ -236,6 +237,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("isnull", new Integer(SqlParserSymbols.KW_ISNULL));
         keywordMap.put("isolation", new Integer(SqlParserSymbols.KW_ISOLATION));
         keywordMap.put("install", new Integer(SqlParserSymbols.KW_INSTALL));
+        keywordMap.put("job", new Integer(SqlParserSymbols.KW_JOB));
         keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
         keywordMap.put("key", new Integer(SqlParserSymbols.KW_KEY));
         keywordMap.put("keys", new Integer(SqlParserSymbols.KW_KEYS));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java
new file mode 100644
index 0000000..fa029ac
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateDataSyncJobStmtTest.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.sync.DataSyncJobType;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+public class CreateDataSyncJobStmtTest {
+    private static final Logger LOG = LogManager.getLogger(CreateDataSyncJobStmtTest.class);
+
+    private String jobName = "testJob";
+    private String dbName = "testDb";
+    private String tblName = "testTbl";
+    private Map<String, String> properties;
+
+    @Mocked
+    Catalog catalog;
+    @Mocked
+    Analyzer analyzer;
+    @Mocked
+    PaloAuth auth;
+    @Injectable
+    Database database;
+    @Injectable
+    OlapTable table;
+
+    @Before
+    public void setUp() {
+        properties = Maps.newHashMap();
+        new Expectations() {
+            {
+                catalog.getDb("testCluster:testDb");
+                minTimes = 0;
+                result = database;
+
+                catalog.getAuth();
+                minTimes = 0;
+                result = auth;
+
+                analyzer.getClusterName();
+                minTimes = 0;
+                result = "testCluster";
+
+                auth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
+                minTimes = 0;
+                result = true;
+
+                database.getTable("testTbl");
+                minTimes = 0;
+                result = table;
+
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+            }
+        };
+
+        Config.enable_create_sync_job = true;
+    }
+    @Test
+    public void testNoDb() {
+        CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+                null, null, null, null, null);
+        try {
+            stmt.analyze(analyzer);
+            Assert.fail();
+        } catch (UserException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testNoType() {
+        BinlogDesc binlogDesc = new BinlogDesc(properties);
+        CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+                jobName, dbName, null, binlogDesc, null);
+        try {
+            stmt.analyze(analyzer);
+            Assert.fail();
+        } catch (UserException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDuplicateColNames() {
+        properties.put("type", "canal");
+        BinlogDesc binlogDesc = new BinlogDesc(properties);
+        List<String> colNames = Lists.newArrayList();
+        colNames.add("a");
+        colNames.add("a");
+        ChannelDescription channelDescription = new ChannelDescription(
+                "mysql_db", "mysql_tbl", tblName, null, colNames);
+        CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+                jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+        try {
+            stmt.analyze(analyzer);
+            Assert.fail();
+        } catch (UserException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testNoUniqueTable() {
+        properties.put("type", "canal");
+        BinlogDesc binlogDesc = new BinlogDesc(properties);
+        ChannelDescription channelDescription = new ChannelDescription(
+                "mysql_db", "mysql_tbl", tblName, null, null);
+        CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+                jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+        try {
+            stmt.analyze(analyzer);
+            Assert.fail();
+        } catch (UserException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testNormal() {
+        new Expectations() {
+            {
+                table.getKeysType();
+                result = KeysType.UNIQUE_KEYS;
+                table.hasDeleteSign();
+                result = true;
+            }
+        };
+        properties.put("type", "canal");
+        BinlogDesc binlogDesc = new BinlogDesc(properties);
+        ChannelDescription channelDescription = new ChannelDescription(
+                "mysql_db", "mysql_tbl", tblName, null, null);
+        CreateDataSyncJobStmt stmt = new CreateDataSyncJobStmt(
+                jobName, dbName, Lists.newArrayList(channelDescription), binlogDesc, null);
+        try {
+            stmt.analyze(analyzer);
+            Assert.assertEquals(jobName, stmt.getJobName());
+            Assert.assertEquals("testCluster:testDb", stmt.getDbName());
+            Assert.assertEquals(DataSyncJobType.CANAL, stmt.getDataSyncJobType());
+        } catch (UserException e) {
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
new file mode 100644
index 0000000..232b414
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.analysis.PauseSyncJobStmt;
+import org.apache.doris.analysis.ResumeSyncJobStmt;
+import org.apache.doris.analysis.StopSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
+import org.apache.doris.load.sync.canal.SyncCanalClient;
+import org.apache.doris.persist.EditLog;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+public class SyncJobManagerTest {
+    private static final Logger LOG = LogManager.getLogger(SyncJobManagerTest.class);
+
+    private long jobId = 10000L;
+    private String jobName = "testJob";
+    private long dbId = 50000L;
+
+    @Mocked
+    EditLog editLog;
+    @Mocked
+    Catalog catalog;
+    @Mocked
+    Database database;
+    @Mocked
+    SyncCanalClient client;
+
+    @Before
+    public void setUp() throws DdlException {
+        new Expectations() {
+            {
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+                catalog.getDb(anyString);
+                minTimes = 0;
+                result = database;
+                database.getId();
+                minTimes = 0;
+                result = dbId;
+                Catalog.getCurrentCatalog();
+                result = catalog;
+            }
+        };
+    }
+
+    @Test
+    public void testAddSyncJob(@Injectable CreateDataSyncJobStmt stmt,
+                               @Mocked SyncJob syncJob) throws DdlException {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        new Expectations() {
+            {
+                SyncJob.fromStmt(anyLong, (CreateDataSyncJobStmt) any);
+                result = canalSyncJob;
+            }
+        };
+
+        SyncJobManager manager = new SyncJobManager();
+        manager.addDataSyncJob(stmt);
+
+        Map<Long, SyncJob> idToSyncJobs = Deencapsulation.getField(manager, "idToSyncJob");
+        Assert.assertEquals(1, idToSyncJobs.size());
+        SyncJob syncJob1 = idToSyncJobs.values().iterator().next();
+        Assert.assertEquals(10000L, syncJob1.getId());
+        Assert.assertEquals("testJob", syncJob1.getJobName());
+        Assert.assertEquals(50000L, syncJob1.getDbId());
+        Assert.assertEquals(JobState.PENDING, syncJob1.getJobState());
+        Assert.assertEquals(DataSyncJobType.CANAL, syncJob1.getJobType());
+        Assert.assertTrue(syncJob1 instanceof CanalSyncJob);
+
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs =
+                Deencapsulation.getField(manager, "dbIdToJobNameToSyncJobs");
+        Assert.assertEquals(1, dbIdToJobNameToSyncJobs.size());
+        Map<String, List<SyncJob>> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.values().iterator().next();
+        Assert.assertEquals(1, jobNameToSyncJobs.size());
+        Assert.assertTrue(jobNameToSyncJobs.containsKey("testJob"));
+        List<SyncJob> syncJobs = jobNameToSyncJobs.get("testJob");
+        Assert.assertEquals(1, syncJobs.size());
+        SyncJob syncJob2 = syncJobs.get(0);
+        Assert.assertEquals(syncJob1, syncJob2);
+    }
+
+    @Test
+    public void testPauseSyncJob(@Injectable PauseSyncJobStmt stmt) {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        new Expectations() {
+            {
+                stmt.getJobName();
+                result = "testJob";
+
+                stmt.getDbFullName();
+                result = "testDb";
+            }
+        };
+
+        SyncJobManager manager = new SyncJobManager();
+        try {
+            manager.pauseSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // add a sync job to manager
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+        // a new sync job state is pending
+        Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+        try {
+            manager.pauseSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to paused
+        canalSyncJob.updateState(JobState.PAUSED, false);
+        Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+        try {
+            manager.pauseSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to cancelled
+        canalSyncJob.updateState(JobState.CANCELLED, false);
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        try {
+            manager.pauseSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to running
+        canalSyncJob.updateState(JobState.RUNNING, false);
+        Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+        try {
+            manager.pauseSyncJob(stmt);
+            Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+        } catch (DdlException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testResumeSyncJob(@Injectable ResumeSyncJobStmt stmt) {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        new Expectations() {
+            {
+                stmt.getJobName();
+                result = "testJob";
+
+                stmt.getDbFullName();
+                result = "testDb";
+            }
+        };
+
+        Deencapsulation.setField(canalSyncJob, "client", client);
+
+        SyncJobManager manager = new SyncJobManager();
+        try {
+            manager.resumeSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // add a sync job to manager
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+        // a new sync job state is pending
+        Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+        try {
+            manager.resumeSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to running
+        canalSyncJob.updateState(JobState.RUNNING, false);
+        Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+        try {
+            manager.resumeSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to cancelled
+        canalSyncJob.updateState(JobState.CANCELLED, false);
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        try {
+            manager.resumeSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to paused
+        canalSyncJob.updateState(JobState.PAUSED, false);
+        Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+        try {
+            manager.resumeSyncJob(stmt);
+            Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+        } catch (DdlException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testStopSyncJob(@Injectable StopSyncJobStmt stmt) {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        new Expectations() {
+            {
+                stmt.getJobName();
+                result = "testJob";
+
+                stmt.getDbFullName();
+                result = "testDb";
+            }
+        };
+
+        Deencapsulation.setField(canalSyncJob, "client", client);
+
+        SyncJobManager manager = new SyncJobManager();
+        try {
+            manager.stopSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // add a sync job to manager
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+
+        // a new sync job state is pending
+        Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+        try {
+            manager.stopSyncJob(stmt);
+            Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to paused
+        canalSyncJob.updateState(JobState.PAUSED, false);
+        Assert.assertEquals(JobState.PAUSED, canalSyncJob.getJobState());
+        try {
+            manager.stopSyncJob(stmt);
+            Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+
+        // change sync job state to running
+        canalSyncJob.updateState(JobState.RUNNING, false);
+        Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+        try {
+            manager.stopSyncJob(stmt);
+            Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        } catch (DdlException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // change sync job state to cancelled
+        canalSyncJob.updateState(JobState.CANCELLED, false);
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        try {
+            manager.stopSyncJob(stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testJobNameExist() throws DdlException {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        SyncJobManager manager = new SyncJobManager();
+        Assert.assertFalse(manager.isJobNameExist("testDb", "testJob"));
+
+        // add a sync job to manager
+        Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs = Maps.newHashMap();
+        Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+        jobNameToSyncJobs.put("testJob", Lists.newArrayList(canalSyncJob));
+        dbIdToJobNameToSyncJobs.put(50000L, jobNameToSyncJobs);
+        Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs", dbIdToJobNameToSyncJobs);
+        Assert.assertTrue(manager.isJobNameExist("testDb", "testJob"));
+    }
+    @Test
+    public void testReplayUpdateSyncJobState() {
+        CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+        // change sync job state to running
+        canalSyncJob.updateState(JobState.RUNNING, false);
+        Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+
+        Deencapsulation.setField(canalSyncJob, "client", client);
+        Deencapsulation.setField(canalSyncJob, "channels", Lists.newArrayList());
+
+        SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(jobId,
+                JobState.CANCELLED, -1L, -1L, -1L,
+                new SyncFailMsg(MsgType.USER_CANCEL, "user cancel"));
+        SyncJobManager manager = new SyncJobManager();
+
+        // add a sync job to manager
+        Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+        idToSyncJob.put(jobId, canalSyncJob);
+        Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+        manager.replayUpdateSyncJobState(info);
+        Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
+    }
+
+
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java
new file mode 100644
index 0000000..c35ee68
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.load.sync.canal.CanalSyncJob;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+public class SyncJobTest {
+    private long jobId;
+    private long dbId;
+    private String jobName;
+
+    @Before
+    public void setUp() {
+        jobId = 1L;
+        dbId = 1L;
+        jobName = "test_job";
+    }
+
+    @Test
+    public void testUpdateStateToRunning() {
+        SyncJob syncJob = new CanalSyncJob(jobId, jobName, dbId);
+        syncJob.updateState(JobState.RUNNING, true);
+        Assert.assertEquals(JobState.RUNNING, syncJob.getJobState());
+        Assert.assertNotEquals(-1L, (long) Deencapsulation.getField(syncJob, "lastStartTimeMs"));
+    }
+
+    @Test
+    public void testUpdateStateInfoPersist() throws IOException {
+        String fileName = "./testSyncJobUpdateStateInfoPersistFile";
+        File file = new File(fileName);
+        if (file.exists()) {
+            file.delete();
+        }
+        file.createNewFile();
+
+        JobState jobState = JobState.CANCELLED;
+        SyncFailMsg failMsg = new SyncFailMsg(MsgType.USER_CANCEL, "user cancel");
+        long lastStartTimeMs = 1621914540L;
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
+        SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(
+                jobId, jobState, lastStartTimeMs, -1L, -1L, failMsg);
+        info.write(out);
+        out.flush();
+        out.close();
+
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        SyncJobUpdateStateInfo replayedInfo = SyncJobUpdateStateInfo.read(in);
+        Assert.assertEquals(jobId, replayedInfo.getId());
+        Assert.assertEquals(jobState, replayedInfo.getJobState());
+        Assert.assertEquals(lastStartTimeMs, replayedInfo.getLastStartTimeMs());
+        Assert.assertEquals(-1L, replayedInfo.getLastStopTimeMs());
+        Assert.assertEquals(-1L, replayedInfo.getFinishTimeMs());
+        Assert.assertEquals(failMsg, replayedInfo.getFailMsg());
+        in.close();
+
+        // delete file
+        if (file.exists()) {
+            file.delete();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
new file mode 100644
index 0000000..a47c9fc
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -0,0 +1,465 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.planner.StreamLoadPlanner;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Status;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanFragmentExecParams;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class CanalSyncDataTest {
+    private static final Logger LOG = LogManager.getLogger(CanalSyncDataTest.class);
+
+    private String binlogFile = "mysql-bin.000001";
+    private long offset = 0;
+    private long nextId = 1000L;
+    private int batchSize = 8192;
+
+    ReentrantLock getLock;
+
+    CanalConnector connector;
+
+    @Mocked
+    CanalSyncJob syncJob;
+    @Mocked
+    Database database;
+    @Mocked
+    OlapTable table;
+    @Mocked
+    Catalog catalog;
+    @Mocked
+    Backend backend;
+    @Mocked
+    StreamLoadTask streamLoadTask;
+    @Mocked
+    StreamLoadPlanner streamLoadPlanner;
+    @Mocked
+    SystemInfoService systemInfoService;
+
+    InternalService.PExecPlanFragmentResult beginOkResult = InternalService.PExecPlanFragmentResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // begin txn OK
+
+    InternalService.PExecPlanFragmentResult beginFailResult = InternalService.PExecPlanFragmentResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(1).build()).build(); // begin txn CANCELLED
+
+    InternalService.PCommitResult commitOkResult = InternalService.PCommitResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // commit txn OK
+
+    InternalService.PCommitResult commitFailResult = InternalService.PCommitResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(1).build()).build(); // commit txn CANCELLED
+
+    InternalService.PRollbackResult abortOKResult = InternalService.PRollbackResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // abort txn OK
+
+    InternalService.PSendDataResult sendDataOKResult = InternalService.PSendDataResult.newBuilder()
+            .setStatus(Status.PStatus.newBuilder().setStatusCode(0).build()).build(); // send data OK
+
+    @Before
+    public void setUp() throws Exception {
+
+        List<Long> backendIds = Lists.newArrayList(104L);
+        Map<Long, Backend> map = Maps.newHashMap();
+        map.put(104L, backend);
+        ImmutableMap<Long, Backend> backendMap = ImmutableMap.copyOf(map);
+        TExecPlanFragmentParams execPlanFragmentParams = new TExecPlanFragmentParams().setParams(new TPlanFragmentExecParams()
+                .setFragmentInstanceId(new TUniqueId())
+                .setPerNodeScanRanges(Maps.newHashMap()));
+
+        new Expectations() {
+            {
+                catalog.getNextId();
+                minTimes = 0;
+                result = 101L;
+
+                syncJob.getId();
+                minTimes = 0;
+                result = 100L;
+
+                database.getId();
+                minTimes = 0;
+                result = 102L;
+
+                table.getId();
+                minTimes = 0;
+                result = 103L;
+
+                table.getName();
+                minTimes = 0;
+                result = "testTbl";
+
+                streamLoadPlanner.plan((TUniqueId) any);
+                minTimes = 0;
+                result = execPlanFragmentParams;
+
+                systemInfoService.seqChooseBackendIds(anyInt, anyBoolean, anyBoolean, anyString);
+                minTimes = 0;
+                result = backendIds;
+
+                systemInfoService.getIdToBackend();
+                minTimes = 0;
+                result = backendMap;
+
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+
+                Catalog.getCurrentSystemInfo();
+                minTimes = 0;
+                result = systemInfoService;
+            }
+        };
+
+        connector = CanalConnectors.newSingleConnector(
+                new InetSocketAddress("127.0.0.1", 11111), "test", "user", "passwd");
+
+        new MockUp<SimpleCanalConnector>() {
+            @Mock
+            void connect() throws CanalClientException {
+            }
+            @Mock
+            void disconnect() throws CanalClientException {
+            }
+            @Mock
+            Message getWithoutAck(int var1, Long var2, TimeUnit var3) throws CanalClientException {
+                offset += batchSize * 1; // Simply set one entry as one byte
+                return CanalTestUtil.fetchMessage(
+                        ++nextId, false, batchSize, binlogFile, offset, "mysql_db", "mysql_tbl");
+            }
+            @Mock
+            void rollback() throws CanalClientException {
+            }
+            @Mock
+            void ack(long var1) throws CanalClientException {
+            }
+            @Mock
+            void subscribe(String var1) throws CanalClientException {
+            }
+        };
+
+        getLock = new ReentrantLock();
+    }
+
+    @Test
+    public void testBeginTxnFail(@Mocked GlobalTransactionMgr transactionMgr) throws Exception {
+
+        new Expectations() {
+            {
+                transactionMgr.beginTransaction(anyLong, (List<Long>) any, anyString,
+                        (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+                minTimes = 0;
+                result = new AnalysisException("test exception");
+
+                Catalog.getCurrentGlobalTransactionMgr();
+                minTimes = 0;
+                result = transactionMgr;
+            }
+        };
+
+        CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+                syncJob, connector, getLock, false);
+        CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+                syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+        CanalSyncChannel channel = new CanalSyncChannel(
+                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+        Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
+        idToChannels.put(channel.getId(), channel);
+        consumer.setChannels(idToChannels);
+
+        channel.start();
+        consumer.start();
+        receiver.start();
+
+        try {
+            Thread.sleep(3000L);
+        } finally {
+            receiver.stop();
+            consumer.stop();
+            channel.stop();
+        }
+
+        Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+        LOG.info(consumer.getPositionInfo());
+    }
+
+    @Test
+    public void testNormal(@Mocked GlobalTransactionMgr transactionMgr,
+                           @Mocked BackendServiceProxy backendServiceProxy,
+                           @Mocked Future<InternalService.PExecPlanFragmentResult> execFuture,
+                           @Mocked Future<InternalService.PCommitResult> commitFuture,
+                           @Mocked Future<InternalService.PSendDataResult> sendDataFuture) throws Exception {
+
+        new Expectations() {
+            {
+                transactionMgr.beginTransaction(anyLong, (List<Long>) any, anyString,
+                        (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+                minTimes = 0;
+                result = 105L;
+
+                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                minTimes = 0;
+                result = execFuture;
+
+                backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any);
+                minTimes = 0;
+                result = commitFuture;
+
+                backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                minTimes = 0;
+                result = sendDataFuture;
+
+                execFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = beginOkResult;
+
+                commitFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = commitOkResult;
+
+                sendDataFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = sendDataOKResult;
+
+                Catalog.getCurrentGlobalTransactionMgr();
+                minTimes = 0;
+                result = transactionMgr;
+
+                BackendServiceProxy.getInstance();
+                minTimes = 0;
+                result = backendServiceProxy;
+            }
+        };
+
+        CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+                syncJob, connector, getLock, false);
+        CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+                syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+        CanalSyncChannel channel = new CanalSyncChannel(
+                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+        Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
+        idToChannels.put(channel.getId(), channel);
+        consumer.setChannels(idToChannels);
+
+        channel.start();
+        consumer.start();
+        receiver.start();
+
+        try {
+            Thread.sleep(Config.sync_commit_interval_second * 1000);
+        } finally {
+            receiver.stop();
+            consumer.stop();
+            channel.stop();
+        }
+
+        LOG.info(consumer.getPositionInfo());
+    }
+
+    @Test
+    public void testExecFragmentFail(@Mocked GlobalTransactionMgr transactionMgr,
+                                     @Mocked BackendServiceProxy backendServiceProxy,
+                                     @Mocked Future<InternalService.PExecPlanFragmentResult> execFuture,
+                                     @Mocked Future<InternalService.PRollbackResult> abortFuture) throws Exception {
+
+        new Expectations() {
+            {
+                transactionMgr.beginTransaction(anyLong, (List<Long>) any, anyString,
+                        (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+                minTimes = 0;
+                result = 105L;
+
+                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                minTimes = 0;
+                result = execFuture;
+
+                backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any);
+                minTimes = 0;
+                result = abortFuture;
+
+                execFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = beginFailResult;
+
+                abortFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = abortOKResult;
+
+                Catalog.getCurrentGlobalTransactionMgr();
+                minTimes = 0;
+                result = transactionMgr;
+
+                BackendServiceProxy.getInstance();
+                minTimes = 0;
+                result = backendServiceProxy;
+            }
+        };
+
+        CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+                syncJob, connector, getLock, false);
+        CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+                syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+        CanalSyncChannel channel = new CanalSyncChannel(
+                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+        Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
+        idToChannels.put(channel.getId(), channel);
+        consumer.setChannels(idToChannels);
+
+        channel.start();
+        consumer.start();
+        receiver.start();
+
+        try {
+            Thread.sleep(3000L);
+        } finally {
+            receiver.stop();
+            consumer.stop();
+            channel.stop();
+        }
+
+        Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+        LOG.info(consumer.getPositionInfo());
+    }
+
+    @Test
+    public void testCommitTxnFail(@Mocked GlobalTransactionMgr transactionMgr,
+                                  @Mocked BackendServiceProxy backendServiceProxy,
+                                  @Mocked Future<InternalService.PExecPlanFragmentResult> execFuture,
+                                  @Mocked Future<InternalService.PCommitResult> commitFuture,
+                                  @Mocked Future<InternalService.PRollbackResult> abortFuture,
+                                  @Mocked Future<InternalService.PSendDataResult> sendDataFuture) throws Exception {
+
+        new Expectations() {
+            {
+                transactionMgr.beginTransaction(anyLong, (List<Long>) any, anyString,
+                        (TransactionState.TxnCoordinator) any, (TransactionState.LoadJobSourceType) any, anyLong);
+                minTimes = 0;
+                result = 105L;
+
+                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                minTimes = 0;
+                result = execFuture;
+
+                backendServiceProxy.commit((TNetworkAddress) any, (Types.PUniqueId) any);
+                minTimes = 0;
+                result = commitFuture;
+
+                backendServiceProxy.rollback((TNetworkAddress) any, (Types.PUniqueId) any);
+                minTimes = 0;
+                result = abortFuture;
+
+                backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                minTimes = 0;
+                result = sendDataFuture;
+
+                execFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = beginOkResult;
+
+                commitFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = commitFailResult;
+
+                abortFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = abortOKResult;
+
+                sendDataFuture.get(anyLong, (TimeUnit) any);
+                minTimes = 0;
+                result = sendDataOKResult;
+
+                Catalog.getCurrentGlobalTransactionMgr();
+                minTimes = 0;
+                result = transactionMgr;
+
+                BackendServiceProxy.getInstance();
+                minTimes = 0;
+                result = backendServiceProxy;
+            }
+        };
+
+        CanalSyncDataConsumer consumer = new CanalSyncDataConsumer(
+                syncJob, connector, getLock, false);
+        CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
+                syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
+        CanalSyncChannel channel = new CanalSyncChannel(
+                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+
+        Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
+        idToChannels.put(channel.getId(), channel);
+        consumer.setChannels(idToChannels);
+
+        channel.start();
+        consumer.start();
+        receiver.start();
+
+        try {
+            Thread.sleep(3000L);
+        } finally {
+            receiver.stop();
+            consumer.stop();
+            channel.stop();
+        }
+
+        Assert.assertEquals("position:N/A", consumer.getPositionInfo());
+        LOG.info(consumer.getPositionInfo());
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java
new file mode 100644
index 0000000..4d4a8d1
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncJobTest.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import org.apache.doris.analysis.BinlogDesc;
+import org.apache.doris.analysis.ChannelDescription;
+import org.apache.doris.analysis.CreateDataSyncJobStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.sync.DataSyncJobType;
+import org.apache.doris.load.sync.SyncChannel;
+import org.apache.doris.load.sync.SyncFailMsg;
+import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob;
+import org.apache.doris.load.sync.SyncJob.JobState;
+import org.apache.doris.load.sync.SyncJob.SyncJobUpdateStateInfo;
+import org.apache.doris.persist.EditLog;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class CanalSyncJobTest {
+    private static final Logger LOG = LogManager.getLogger(CanalSyncJobTest.class);
+
+    private long jobId;
+    private long dbId;
+    private String dbName;
+    private String tblName;
+    private String jobName;
+    private Catalog catalog;
+    private Map<String, String> properties;
+
+    @Mocked
+    EditLog editLog;
+
+    @Injectable
+    Database database;
+
+    @Injectable
+    OlapTable table;
+
+    @Before
+    public void setUp() {
+        jobId = 1L;
+        dbId = 10000L;
+        dbName = "testDb";
+        tblName = "testTbl";
+        jobName = "testJob";
+        properties = Maps.newHashMap();
+        properties.put(CanalSyncJob.CANAL_SERVER_IP, "127.0.0.1");
+        properties.put(CanalSyncJob.CANAL_SERVER_PORT, "11111");
+        properties.put(CanalSyncJob.CANAL_DESTINATION, "test");
+        properties.put(CanalSyncJob.CANAL_USERNAME, "test_user");
+        properties.put(CanalSyncJob.CANAL_PASSWORD, "test_password");
+
+        catalog = Deencapsulation.newInstance(Catalog.class);
+        new Expectations(catalog) {
+            {
+                catalog.getDb(10000L);
+                minTimes = 0;
+                result = database;
+
+                catalog.getDb("testDb");
+                minTimes = 0;
+                result = database;
+
+                catalog.getEditLog();
+                minTimes = 0;
+                result = editLog;
+
+                Catalog.getCurrentCatalog();
+                minTimes = 0;
+                result = catalog;
+            }
+        };
+
+        new Expectations(database) {
+            {
+                database.getId();
+                minTimes = 0;
+                result = dbId;
+
+                database.getTable("testTbl");
+                minTimes = 0;
+                result = table;
+            }
+        };
+
+        new Expectations(table) {
+            {
+                table.getName();
+                minTimes = 0;
+                result = tblName;
+
+                table.getKeysType();
+                minTimes = 0;
+                result = KeysType.UNIQUE_KEYS;
+
+                table.hasDeleteSign();
+                minTimes = 0;
+                result = true;
+            }
+        };
+
+        new MockUp<SyncCanalClient>() {
+            @Mock
+            public void startup() {
+            }
+            @Mock
+            public void shutdown(boolean needCleanUp) {
+            }
+            @Mock
+            public void registerChannels(List<SyncChannel> channels) {
+            }
+        };
+    }
+
+    @Test
+    public void testCreateFromStmtWithNoDatabase(@Injectable CreateDataSyncJobStmt stmt) {
+        new Expectations() {
+            {
+                stmt.getDbName();
+                result = "";
+            }
+        };
+
+        try {
+            SyncJob.fromStmt(jobId, stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCreateFromStmtWithoutBinlog(@Injectable CreateDataSyncJobStmt stmt,
+                                                @Injectable ChannelDescription channelDescription,
+                                                @Injectable BinlogDesc binlogDesc) {
+        List<ChannelDescription> channelDescriptions = Lists.newArrayList();
+        channelDescriptions.add(channelDescription);
+        new Expectations() {
+            {
+                stmt.getJobName();
+                result = jobName;
+
+                stmt.getDbName();
+                result = dbName;
+
+                stmt.getDataSyncJobType();
+                result = DataSyncJobType.CANAL;
+
+                stmt.getBinlogDesc();
+                result = binlogDesc;
+
+                stmt.getChannelDescriptions();
+                result = channelDescriptions;
+
+                binlogDesc.getProperties();
+                result = Maps.newHashMap();
+            }
+        };
+
+        try {
+            SyncJob.fromStmt(jobId, stmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            LOG.info(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCreateFromStmt(@Injectable CreateDataSyncJobStmt stmt,
+                                   @Injectable ChannelDescription channelDescription,
+                                   @Injectable BinlogDesc binlogDesc) {
+
+        List<ChannelDescription> channelDescriptions = Lists.newArrayList();
+        channelDescriptions.add(channelDescription);
+
+        new Expectations() {
+            {
+                stmt.getJobName();
+                result = jobName;
+
+                stmt.getDbName();
+                result = dbName;
+
+                stmt.getDataSyncJobType();
+                result = DataSyncJobType.CANAL;
+
+                stmt.getBinlogDesc();
+                result = binlogDesc;
+
+                stmt.getChannelDescriptions();
+                result = channelDescriptions;
+
+                binlogDesc.getProperties();
+                result = properties;
+            }
+        };
+
+        try {
+            SyncJob syncJob = SyncJob.fromStmt(jobId, stmt);
+            CanalSyncJob canalSyncJob = (CanalSyncJob) syncJob;
+            Assert.assertEquals(jobId, canalSyncJob.getId());
+            Assert.assertEquals(jobName, canalSyncJob.getJobName());
+            Assert.assertEquals(dbId, canalSyncJob.getDbId());
+            Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+            Assert.assertEquals(DataSyncJobType.CANAL, canalSyncJob.getJobType());
+            Assert.assertNull(canalSyncJob.getFailMsg());
+
+        } catch (DdlException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testExecute(@Injectable ChannelDescription channelDescription,
+                            @Injectable BinlogDesc binlogDesc) {
+        List<ChannelDescription> channelDescriptions = Lists.newArrayList();
+        channelDescriptions.add(channelDescription);
+
+        new Expectations() {
+            {
+                binlogDesc.getProperties();
+                result = properties;
+
+                channelDescription.getTargetTable();
+                result = tblName;
+
+                channelDescription.getSrcDatabase();
+                result = "mysqlDb";
+
+                channelDescription.getSrcTableName();
+                result = "mysqlTbl";
+
+                channelDescription.getColNames();
+                result = null;
+
+                channelDescription.getPartitionNames();
+                result = null;
+            }
+        };
+
+        try {
+            CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+            canalSyncJob.setChannelDescriptions(channelDescriptions);
+            canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+            Assert.assertEquals(jobId, canalSyncJob.getId());
+            Assert.assertEquals(jobName, canalSyncJob.getJobName());
+            Assert.assertEquals(dbId, canalSyncJob.getDbId());
+            Assert.assertEquals(DataSyncJobType.CANAL, canalSyncJob.getJobType());
+            Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+            Assert.assertNull(canalSyncJob.getFailMsg());
+            // execute job
+            canalSyncJob.execute();
+            Assert.assertTrue(canalSyncJob.isInit());
+            Assert.assertTrue(canalSyncJob.isRunning());
+            Assert.assertEquals(JobState.RUNNING, canalSyncJob.getJobState());
+        } catch (UserException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testPauseAndResumeJob(@Injectable BinlogDesc binlogDesc) {
+        new MockUp<SyncCanalClient>() {
+            @Mock
+            public void startup() {
+            }
+            @Mock
+            public void shutdown(boolean needCleanUp) {
+            }
+            @Mock
+            public void registerChannels(List<SyncChannel> channels) {
+            }
+        };
+
+        new MockUp<CanalSyncJob>() {
+            @Mock
+            public void initChannels() {
+            }
+        };
+
+        new Expectations() {
+            {
+                binlogDesc.getProperties();
+                result = properties;
+            }
+        };
+
+        try {
+            CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+            canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+            Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+            // run job
+            canalSyncJob.execute();
+            Assert.assertTrue(canalSyncJob.isRunning());
+            // pause job
+            canalSyncJob.pause();
+            Assert.assertTrue(canalSyncJob.isPaused());
+            // resume job
+            canalSyncJob.resume();
+            Assert.assertTrue(canalSyncJob.isRunning());
+        } catch (UserException e) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testCancelJob(@Injectable BinlogDesc binlogDesc) {
+
+        new MockUp<CanalSyncJob>() {
+            @Mock
+            public void initChannels() {
+            }
+        };
+
+        new Expectations() {
+            {
+                binlogDesc.getProperties();
+                result = properties;
+            }
+        };
+
+        try {
+            CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+            canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+            Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+            // run job
+            canalSyncJob.execute();
+            Assert.assertTrue(canalSyncJob.isRunning());
+            // cancel job
+            canalSyncJob.cancel(MsgType.USER_CANCEL, "user cancel");
+            Assert.assertTrue(canalSyncJob.isCancelled());
+            Assert.assertTrue(canalSyncJob.isCompleted());
+            Assert.assertEquals(MsgType.USER_CANCEL, canalSyncJob.getFailMsg().getMsgType());
+            Assert.assertEquals("user cancel", canalSyncJob.getFailMsg().getMsg());
+        } catch (UserException e) {
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void testReplayUpdateState(@Injectable ChannelDescription channelDescription,
+                                      @Injectable BinlogDesc binlogDesc) {
+        List<ChannelDescription> channelDescriptions = Lists.newArrayList();
+        channelDescriptions.add(channelDescription);
+
+        new Expectations() {
+            {
+                binlogDesc.getProperties();
+                result = properties;
+                channelDescription.getTargetTable();
+                result = tblName;
+                channelDescription.getSrcDatabase();
+                result = "mysqlDb";
+                channelDescription.getSrcTableName();
+                result = "mysqlTbl";
+                channelDescription.getColNames();
+                result = null;
+                channelDescription.getPartitionNames();
+                result = null;
+            }
+        };
+        
+        try {
+            CanalSyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+            canalSyncJob.setChannelDescriptions(channelDescriptions);
+            canalSyncJob.checkAndSetBinlogInfo(binlogDesc);
+            Assert.assertEquals(JobState.PENDING, canalSyncJob.getJobState());
+            SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(
+                    jobId, JobState.CANCELLED, 1622469769L, -1L, -1L,
+                    new SyncFailMsg(MsgType.USER_CANCEL, "user cancel"));
+            canalSyncJob.replayUpdateSyncJobState(info);
+            Assert.assertTrue(canalSyncJob.isCancelled());
+            Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+        } catch (UserException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java
new file mode 100644
index 0000000..d2da579
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalTestUtil.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.sync.canal;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.sql.Types;
+import java.util.List;
+
+public class CanalTestUtil {
+
+    public static CanalEntry.Column buildColumn(String colName, int colValue) {
+        CanalEntry.Column.Builder columnBuilder = CanalEntry.Column.newBuilder();
+        Serializable value = Integer.valueOf(colValue);
+        columnBuilder.setName(colName);
+        columnBuilder.setIsKey(true);
+        columnBuilder.setMysqlType("bigint");
+        columnBuilder.setIndex(0);
+        columnBuilder.setIsNull(false);
+        columnBuilder.setValue(value.toString());
+        columnBuilder.setSqlType(Types.BIGINT);
+        columnBuilder.setUpdated(false);
+        return columnBuilder.build();
+    }
+
+    public static CanalEntry.RowChange buildRowChange() {
+        CanalEntry.RowData.Builder rowDataBuilder = CanalEntry.RowData.newBuilder();
+        CanalEntry.RowChange.Builder rowChangeBuider = CanalEntry.RowChange.newBuilder();
+        rowChangeBuider.setIsDdl(false);
+        rowChangeBuider.setEventType(CanalEntry.EventType.INSERT);
+        rowDataBuilder.addAfterColumns(buildColumn("a", 1));
+        rowDataBuilder.addAfterColumns(buildColumn("b", 2));
+        rowChangeBuider.addRowDatas(rowDataBuilder.build());
+        return rowChangeBuider.build();
+    }
+
+    public static CanalEntry.Entry buildEntry(String binlogFile, long offset, long timestamp) {
+        CanalEntry.Header.Builder headerBuilder = CanalEntry.Header.newBuilder();
+        headerBuilder.setLogfileName(binlogFile);
+        headerBuilder.setLogfileOffset(offset);
+        headerBuilder.setExecuteTime(timestamp);
+        CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
+        entryBuilder.setHeader(headerBuilder.build());
+        entryBuilder.setEntryType(CanalEntry.EntryType.ROWDATA);
+        entryBuilder.setStoreValue(buildRowChange().toByteString());
+        return entryBuilder.build();
+    }
+
+    public static CanalEntry.Entry buildEntry(String binlogFile, long offset, long timestamp, String schemaName, String tableName) {
+        CanalEntry.Header.Builder headerBuilder = CanalEntry.Header.newBuilder();
+        headerBuilder.setLogfileName(binlogFile);
+        headerBuilder.setLogfileOffset(offset);
+        headerBuilder.setExecuteTime(timestamp);
+        headerBuilder.setSchemaName(schemaName);
+        headerBuilder.setTableName(tableName);
+        CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
+        entryBuilder.setHeader(headerBuilder.build());
+        entryBuilder.setEntryType(CanalEntry.EntryType.ROWDATA);
+        entryBuilder.setStoreValue(buildRowChange().toByteString());
+        return entryBuilder.build();
+    }
+
+    public static Message fetchEOFMessage() {
+        return new Message(-1L, Lists.newArrayList());
+    }
+
+    public static Message fetchMessage(long id, boolean isRaw, int batchSize, String binlogFile, long offset, String schemaName, String tableName) {
+        List<CanalEntry.Entry> entries = Lists.newArrayList();
+        for (int i = 0 ; i < batchSize; i++) {
+            entries.add(buildEntry(binlogFile, offset++, 1024, schemaName, tableName));
+        }
+        return new Message(id, isRaw, entries);
+    }
+
+
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org