You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/17 02:01:41 UTC

[incubator-doris] branch master updated: [Enhance] [Binlog] Reduce thread number of SyncJob to save resources (#6418)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cdb7c  [Enhance] [Binlog] Reduce thread number of SyncJob to save resources (#6418)
95cdb7c is described below

commit 95cdb7cc0caba77d2a16cbff5d4717b8354bb7a1
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Sep 17 10:01:27 2021 +0800

    [Enhance] [Binlog] Reduce thread number of SyncJob to save resources (#6418)
    
    This commit is going to reduce thread number of SyncJob .
    1、Submit send task to thread pool to send data.
    2、Submit eof task to thread pool to block and wake up client to commit transactions.
    3、Use SerialExecutorService to ensure correct order of sent data in every channel.
    
    Besides,some bugs have been fixed in this commit
    1、Failed to resume syncJob.
    2、Failed to do sync data when set multiple tables in a syncJob.
    3、In a cluster with multiple Fe, master may hang up after creating syncJob.
---
 fe/fe-core/src/main/cup/sql_parser.cup             |   2 +
 .../apache/doris/analysis/ChannelDescription.java  |  10 ++
 .../java/org/apache/doris/catalog/Catalog.java     |   2 +-
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../org/apache/doris/load/sync/SyncChannel.java    |  23 +---
 .../doris/load/sync/SyncChannelCallback.java       |   2 -
 .../apache/doris/load/sync/SyncChannelHandle.java  |  53 --------
 .../org/apache/doris/load/sync/SyncChecker.java    |   3 +-
 .../java/org/apache/doris/load/sync/SyncJob.java   |   4 +
 .../org/apache/doris/load/sync/SyncLifeCycle.java  |   5 +
 .../doris/load/sync/canal/CanalSyncChannel.java    |  94 ++++++-------
 .../load/sync/canal/CanalSyncDataConsumer.java     |  36 +++--
 .../apache/doris/load/sync/canal/CanalSyncJob.java |  25 ++--
 .../doris/load/sync/canal/SyncCanalClient.java     |  54 ++------
 .../doris/load/sync/position/PositionMeta.java     |   3 +-
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |   3 +-
 .../java/org/apache/doris/task/SerialExecutor.java | 146 +++++++++++++++++++++
 .../apache/doris/task/SerialExecutorService.java   |  80 +++++++++++
 .../doris/{load/sync => task}/SyncPendingTask.java |   4 +-
 .../main/java/org/apache/doris/task/SyncTask.java  |  71 ++++++++++
 .../java/org/apache/doris/task/SyncTaskPool.java   |  51 +++++++
 .../doris/load/sync/canal/CanalSyncDataTest.java   |  17 +--
 .../doris/task/SerialExecutorServiceTest.java      | 133 +++++++++++++++++++
 23 files changed, 618 insertions(+), 208 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 5a62e67..c8d28a4 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -5264,6 +5264,8 @@ keyword ::=
     {: RESULT = id; :}
     | KW_ISOLATION:id
     {: RESULT = id; :}
+    | KW_JOB:id
+    {: RESULT = id; :}
     | KW_ENCRYPTKEY:id
     {: RESULT = id; :}
     | KW_ENCRYPTKEYS:id
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java
index 51eae33..c926fae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ChannelDescription.java
@@ -60,6 +60,8 @@ public class ChannelDescription implements Writable {
     // column names of source table
     @SerializedName(value = "colNames")
     private final List<String> colNames;
+    @SerializedName(value = "channelId")
+    private long channelId;
 
     public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List<String> colNames) {
         this.srcDatabase = srcDatabase;
@@ -119,6 +121,14 @@ public class ChannelDescription implements Writable {
         }
     }
 
+    public void setChannelId(long channelId) {
+        this.channelId = channelId;
+    }
+
+    public long getChannelId() {
+        return this.channelId;
+    }
+
     public String getTargetTable() {
         return targetTable;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 265e1da..1c44149 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1318,7 +1318,7 @@ public class Catalog {
         ExportChecker.init(Config.export_checker_interval_second * 1000L);
         ExportChecker.startAll();
         // Sync checker
-        SyncChecker.init(Config.sync_checker_interval_second);
+        SyncChecker.init(Config.sync_checker_interval_second * 1000L);
         SyncChecker.startAll();
         // Tablet checker and scheduler
         tabletChecker.start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2eaf139..645e1d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -636,6 +636,11 @@ public class Config extends ConfigBase {
     @ConfField public static int sync_checker_interval_second = 5;
 
     /**
+     * max num of thread to handle sync task in sync task thread-pool.
+     */
+    @ConfField public static int max_sync_task_threads_num = 10;
+
+    /**
      * Default number of waiting jobs for routine load and version 2 of load
      * This is a desired number.
      * In some situation, such as switch the master, the current number is maybe more than desired_max_waiting_jobs
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java
index 85644b1..8a47385 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannel.java
@@ -18,7 +18,6 @@
 package org.apache.doris.load.sync;
 
 import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.UserException;
@@ -32,7 +31,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class SyncChannel extends SyncLifeCycle {
+public class SyncChannel {
     private static final Logger LOG = LogManager.getLogger(SyncChannel.class);
 
     protected long id;
@@ -46,8 +45,8 @@ public class SyncChannel extends SyncLifeCycle {
     protected String srcTable;
     protected SyncChannelCallback callback;
 
-    public SyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
-        this.id = Catalog.getCurrentCatalog().getNextId();
+    public SyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
+        this.id = id;
         this.jobId = syncJob.getId();
         this.db = db;
         this.tbl = table;
@@ -57,22 +56,6 @@ public class SyncChannel extends SyncLifeCycle {
         this.srcTable = srcTable.toLowerCase();
     }
 
-    @Override
-    public void start() {
-        super.start();
-        LOG.info("channel {} has been started. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
-    }
-
-    @Override
-    public void stop() {
-        super.stop();
-        LOG.info("channel {} has been stopped. dest table: {}, mysql src table: {}.{}", id, targetTable, srcDataBase, srcTable);
-    }
-
-    @Override
-    public void process() {
-    }
-
     public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
             InterruptedException, ExecutionException {
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java
index 8b2f239..2cdf717 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelCallback.java
@@ -19,8 +19,6 @@ package org.apache.doris.load.sync;
 
 public interface SyncChannelCallback {
 
-    public boolean state();
-
     public void onFinished(long channelId);
 
     public void onFailed(String errMsg);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java
index 4e3a397..8f7721e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChannelHandle.java
@@ -24,14 +24,11 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class SyncChannelHandle implements SyncChannelCallback {
     private Logger LOG = LogManager.getLogger(SyncChannelHandle.class);
 
     // channel id -> dummy value(-1)
     private MarkedCountDownLatch<Long, Long> latch;
-    private Sync sync = new Sync();
 
     public void reset(int size) {
         this.latch = new MarkedCountDownLatch<>(size);
@@ -41,19 +38,6 @@ public class SyncChannelHandle implements SyncChannelCallback {
         latch.addMark(channel.getId(), -1L);
     }
 
-    public void set(Boolean mutex) {
-        if (mutex) {
-            this.sync.innerSetTrue();
-        } else {
-            this.sync.innerSetFalse();
-        }
-    }
-
-    @Override
-    public boolean state() {
-        return this.sync.innerState();
-    }
-
     @Override
     public void onFinished(long channelId) {
         this.latch.markedCountDown(channelId, -1L);
@@ -71,41 +55,4 @@ public class SyncChannelHandle implements SyncChannelCallback {
     public Status getStatus() {
         return latch.getStatus();
     }
-
-    // This class describes the inner state.
-    private final class Sync {
-        private AtomicBoolean state;
-
-        boolean innerState() {
-            return this.state.get();
-        }
-
-        public boolean getState() {
-            return state.get();
-        }
-
-        void innerSetTrue() {
-            boolean s;
-            do {
-                s = getState();
-                if (s) {
-                    return;
-                }
-            } while(!state.compareAndSet(s, true));
-        }
-
-        void innerSetFalse() {
-            boolean s;
-            do {
-                s = getState();
-                if (!s) {
-                    return;
-                }
-            } while(!state.compareAndSet(s, false));
-        }
-
-        private Sync() {
-            state = new AtomicBoolean(false);
-        }
-    }
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index 830c449..78da903 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -25,6 +25,7 @@ import org.apache.doris.task.MasterTaskExecutor;
 
 import com.google.common.collect.Maps;
 
+import org.apache.doris.task.SyncPendingTask;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -67,7 +68,7 @@ public class SyncChecker extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        LOG.debug("start check export jobs. job state: {}", jobState.name());
+        LOG.debug("start check sync jobs. job state: {}", jobState.name());
         switch (jobState) {
             case PENDING:
                 runPendingJobs();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
index 3ce5b48..9ceb044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJob.java
@@ -307,6 +307,10 @@ public abstract class SyncJob implements Writable {
 
     public void setChannelDescriptions(List<ChannelDescription> channelDescriptions) {
         this.channelDescriptions = channelDescriptions;
+        // set channel id
+        for (ChannelDescription channelDescription : channelDescriptions) {
+            channelDescription.setChannelId(Catalog.getCurrentCatalog().getNextId());
+        }
     }
 
     public long getId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
index 3c98137..9109c92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncLifeCycle.java
@@ -62,6 +62,11 @@ public abstract class SyncLifeCycle {
         this.running = false;
 
         if (thread != null) {
+            // Deadlock prevention
+            if (thread == Thread.currentThread()) {
+                return;
+            }
+
             try {
                 thread.join();
             } catch (InterruptedException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 8b5dc54..266d52f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -32,6 +32,8 @@ import org.apache.doris.load.sync.model.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.InsertStreamTxnExecutor;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.task.SyncTask;
+import org.apache.doris.task.SyncTaskPool;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TMergeType;
@@ -49,7 +51,6 @@ import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -58,8 +59,6 @@ import org.apache.thrift.TException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 public class CanalSyncChannel extends SyncChannel {
@@ -69,44 +68,47 @@ public class CanalSyncChannel extends SyncChannel {
     private static final String DELETE_CONDITION = DELETE_COLUMN + "=1";
     private static final String NULL_VALUE_FOR_LOAD = "\\N";
 
+    private final int index;
+
     private long timeoutSecond;
     private long lastBatchId;
-    private LinkedBlockingQueue<Data<InternalService.PDataRow>> pendingQueue;
+
     private Data<InternalService.PDataRow> batchBuffer;
     private InsertStreamTxnExecutor txnExecutor;
 
-    public CanalSyncChannel(SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
-        super(syncJob, db, table, columns, srcDataBase, srcTable);
+    public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns, String srcDataBase, String srcTable) {
+        super(id, syncJob, db, table, columns, srcDataBase, srcTable);
+        this.index = SyncTaskPool.getNextIndex();
         this.batchBuffer = new Data<>();
-        this.pendingQueue = Queues.newLinkedBlockingQueue(128);
         this.lastBatchId = -1L;
         this.timeoutSecond = -1L;
     }
 
-    public void process() {
-        while (running) {
-            if (!isTxnInit()) {
-                continue;
-            }
-            // if txn has begun, send all data in queue
-            if (isTxnBegin()) {
-                while (!pendingQueue.isEmpty()) {
-                    try {
-                        Data<InternalService.PDataRow> rows = pendingQueue.poll(CanalConfigs.channelWaitingTimeoutMs, TimeUnit.MILLISECONDS);
-                        if (rows != null) {
-                            sendData(rows);
-                        }
-                    } catch (Exception e) {
-                        String errMsg = "encounter exception in channel, channel " + id + ", " +
-                                "msg: " + e.getMessage() + ", table: " + targetTable;
-                        LOG.error(errMsg);
-                        callback.onFailed(errMsg);
-                    }
-                }
-            }
-            if (callback.state()) {
-                callback.onFinished(id);
-            }
+    private final static class SendTask extends SyncTask {
+        private final InsertStreamTxnExecutor executor;
+        private final Data<InternalService.PDataRow> rows;
+
+        public SendTask(long signature, int index, SyncChannelCallback callback, Data<InternalService.PDataRow> rows, InsertStreamTxnExecutor executor) {
+            super(signature, index, callback);
+            this.executor = executor;
+            this.rows = rows;
+        }
+
+        public void exec() throws Exception {
+            TransactionEntry txnEntry = executor.getTxnEntry();
+            txnEntry.setDataToSend(rows.getDatas());
+            executor.sendData();
+        }
+    }
+
+    private final static class EOFTask extends SyncTask {
+
+        public EOFTask(long signature, int index, SyncChannelCallback callback) {
+            super(signature, index, callback);
+        }
+
+        public void exec() throws Exception {
+            callback.onFinished(signature);
         }
     }
 
@@ -189,10 +191,10 @@ public class CanalSyncChannel extends SyncChannel {
             throw e;
         }  finally {
             this.batchBuffer = new Data<>();
-            this.pendingQueue.clear();
             updateBatchId(-1L);
         }
     }
+
     @Override
     public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException {
         if (!isTxnBegin()) {
@@ -213,10 +215,10 @@ public class CanalSyncChannel extends SyncChannel {
             throw e;
         } finally {
             this.batchBuffer = new Data<>();
-            this.pendingQueue.clear();
             updateBatchId(-1L);
         }
     }
+
     @Override
     public void initTxn(long timeoutSecond) {
         if (!isTxnInit()) {
@@ -254,7 +256,12 @@ public class CanalSyncChannel extends SyncChannel {
         }
     }
 
-    private void execute(long batchId, CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
+    public void submitEOF() {
+        EOFTask task = new EOFTask(id, index, callback);
+        SyncTaskPool.submit(task);
+    }
+
+    public void execute(long batchId, CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
         InternalService.PDataRow row = parseRow(eventType, columns);
         try {
             Preconditions.checkState(isTxnInit());
@@ -262,7 +269,8 @@ public class CanalSyncChannel extends SyncChannel {
                 if (!isTxnBegin()) {
                     beginTxn(batchId);
                 } else {
-                    this.pendingQueue.put(this.batchBuffer);
+                    SendTask task = new SendTask(id, index, callback, batchBuffer, txnExecutor);
+                    SyncTaskPool.submit(task);
                     this.batchBuffer = new Data<>();
                 }
                 updateBatchId(batchId);
@@ -294,19 +302,13 @@ public class CanalSyncChannel extends SyncChannel {
         return row.build();
     }
 
-    private void sendData(Data<InternalService.PDataRow> rows) throws TException, TimeoutException,
-            InterruptedException, ExecutionException {
-        Preconditions.checkState(isTxnBegin());
-        TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-        txnEntry.setDataToSend(rows.getDatas());
-        this.txnExecutor.sendData();
-    }
-
     public void flushData() throws TException, TimeoutException,
             InterruptedException, ExecutionException {
-        if (batchBuffer.isNotEmpty()) {
-            sendData(batchBuffer);
-            batchBuffer = new Data<>();
+        if (this.batchBuffer.isNotEmpty()) {
+            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+            txnEntry.setDataToSend(batchBuffer.getDatas());
+            this.txnExecutor.sendData();
+            this.batchBuffer = new Data<>();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java
index 10c393b..bdfc732 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncDataConsumer.java
@@ -102,7 +102,6 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
 
     @Override
     public void beginForTxn() {
-        handle.set(false);
         handle.reset(idToChannels.size());
         for (CanalSyncChannel channel : idToChannels.values()) {
             channel.initTxn(Config.max_stream_load_timeout_second);
@@ -161,15 +160,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
     }
 
     public Status waitForTxn() {
+        for (CanalSyncChannel channel : idToChannels.values()) {
+            channel.submitEOF();
+        }
+
         Status st = Status.CANCELLED;
-        handle.set(true);
         try {
             handle.join();
             st = handle.getStatus();
         } catch (InterruptedException e) {
             logger.warn("InterruptedException: ", e);
-        } finally {
-            handle.set(false);
         }
         return st;
     }
@@ -190,7 +190,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
                 long totalMemSize = 0L;
                 long startTime = System.currentTimeMillis();
                 beginForTxn();
-                while (true) {
+                while (running) {
                     Events<CanalEntry.Entry, EntryPosition> dataEvents = null;
                     try {
                         dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS);
@@ -227,7 +227,12 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
                         break;
                     }
                 }
+
                 Status st = waitForTxn();
+                if (!running) {
+                    abortForTxn("stopping client");
+                    continue;
+                }
                 if (st.ok()) {
                     commitForTxn();
                 } else {
@@ -260,7 +265,7 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
         }
 
         int startIndex = 0;
-        // if last ack position is null, it is the first time to consume batch (startOffset = 0)
+        // if last ack position is null, it is the first time to consume batch
         EntryPosition lastAckPosition = positionMeta.getAckPosition();
         if (lastAckPosition != null) {
             EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0));
@@ -303,14 +308,18 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
         EntryPosition startPosition = dataEvents.getPositionRange().getStart();
         EntryPosition endPosition = dataEvents.getPositionRange().getEnd();
         for (CanalSyncChannel channel : idToChannels.values()) {
+            String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable());
+            // if last commit position is null, it is the first time to execute batch
             EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
-            String key = channel.getSrcDataBase() + "." + channel.getSrcTable();
-            if (commitPosition.compareTo(startPosition) < 0) {
+            if (commitPosition != null) {
+                if (commitPosition.compareTo(startPosition) < 0) {
+                    preferChannels.put(key, channel);
+                } else if (commitPosition.compareTo(endPosition) < 0) {
+                    secondaryChannels.put(key, channel);
+                }
+            } else {
                 preferChannels.put(key, channel);
             }
-            else if (commitPosition.compareTo(endPosition) < 0) {
-                secondaryChannels.put(key, channel);
-            }
         }
 
         // distribute data to channels
@@ -405,13 +414,16 @@ public class CanalSyncDataConsumer extends SyncDataConsumer {
     private void rollback() {
         holdGetLock();
         try {
-            connector.rollback();
             // Wait for the receiver to put the last message into the queue before clearing queue
             try {
                 Thread.sleep(1000L);
             } catch (InterruptedException e) {
                 // ignore
             }
+
+            if (!ackBatches.isEmpty()) {
+                connector.rollback();
+            }
         } finally {
             releaseGetLock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java
index dce10ba..c22b9f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncJob.java
@@ -109,8 +109,8 @@ public class CanalSyncJob extends SyncJob {
                         colNames.add(column.getName());
                     }
                 }
-                CanalSyncChannel syncChannel = new CanalSyncChannel(this, db, olapTable, colNames,
-                        channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
+                CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db,
+                        olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
                 if (channelDescription.getPartitionNames() != null) {
                     syncChannel.setPartitions(channelDescription.getPartitionNames());
                 }
@@ -183,7 +183,9 @@ public class CanalSyncJob extends SyncJob {
     public void execute() throws UserException {
         LOG.info("try to start canal client. Remote ip: {}, remote port: {}, debug: {}", ip, port, debug);
         // init
-        init();
+        if (!isInit()) {
+            init();
+        }
         // start client
         unprotectedStartClient();
     }
@@ -193,10 +195,12 @@ public class CanalSyncJob extends SyncJob {
         LOG.info("Cancel canal sync job {}. MsgType: {}, errMsg: {}", id, msgType.name(), errMsg);
         failMsg = new SyncFailMsg(msgType, errMsg);
         switch (msgType) {
-            case USER_CANCEL:
             case SUBMIT_FAIL:
             case RUN_FAIL:
+                unprotectedStopClient(JobState.PAUSED);
+                break;
             case UNKNOWN:
+            case USER_CANCEL:
                 unprotectedStopClient(JobState.CANCELLED);
                 break;
             default:
@@ -228,11 +232,7 @@ public class CanalSyncJob extends SyncJob {
             return;
         }
         if (client != null) {
-            if (jobState == JobState.CANCELLED) {
-                client.shutdown(true);
-            } else {
-                client.shutdown(false);
-            }
+            client.shutdown(true);
         }
         updateState(jobState, false);
         LOG.info("client has been stopped. id: {}, jobName: {}" , id, jobName);
@@ -251,15 +251,12 @@ public class CanalSyncJob extends SyncJob {
             JobState jobState = info.getJobState();
             switch (jobState) {
                 case RUNNING:
-                    client.startup();
-                    updateState(JobState.RUNNING, true);
+                    updateState(JobState.PENDING, true);
                     break;
                 case PAUSED:
-                    client.shutdown(false);
                     updateState(JobState.PAUSED, true);
                     break;
                 case CANCELLED:
-                    client.shutdown(true);
                     updateState(JobState.CANCELLED, true);
                     break;
             }
@@ -300,4 +297,4 @@ public class CanalSyncJob extends SyncJob {
                 + ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs)
                 + "]";
     }
-}
\ No newline at end of file
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java
index 33cb8cf..421c46d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/SyncCanalClient.java
@@ -53,8 +53,6 @@ public class SyncCanalClient {
         lock.unlock();
     }
 
-    private ShutDownWorker shutDownWorker;
-
     public SyncCanalClient(CanalSyncJob syncJob, String destination, CanalConnector connector, int batchSize, boolean debug) {
         this(syncJob, destination, connector, batchSize, debug, ".*\\..*");
     }
@@ -71,13 +69,9 @@ public class SyncCanalClient {
         Preconditions.checkState(!idToChannels.isEmpty(), "no channel is registered");
         lock();
         try {
-            // 1.start all threads in channel
-            for (CanalSyncChannel channel : idToChannels.values()) {
-                channel.start();
-            }
-            // 2. start executor
+            // 1. start executor
             consumer.start();
-            // 3. start receiver
+            // 2. start receiver
             receiver.start();
         } finally {
             unlock();
@@ -85,43 +79,17 @@ public class SyncCanalClient {
         logger.info("canal client has been started.");
     }
 
-    // Stop client asynchronously
     public void shutdown(boolean needCleanUp) {
-        this.shutDownWorker = new ShutDownWorker(needCleanUp);
-        shutDownWorker.shutdown();
-        logger.info("canal client shutdown worker has been started.");
-    }
-
-    public class ShutDownWorker implements Runnable {
-        public Thread thread;
-        public boolean needCleanUp;
-
-        public ShutDownWorker(boolean needCleanUp) {
-            this.thread = new Thread(this, "ShutDownWorker");
-            this.needCleanUp = needCleanUp;
-        }
-
-        public void shutdown() {
-            thread.start();
-        }
-
-        @Override
-        public void run() {
-            lock();
-            try {
-                // 1. stop receiver
-                receiver.stop();
-                // 2. stop executor
-                consumer.stop(needCleanUp);
-                // 3. stop channels
-                for (CanalSyncChannel channel : idToChannels.values()) {
-                    channel.stop();
-                }
-            } finally {
-                unlock();
-            }
-            logger.info("canal client has been stopped.");
+        lock();
+        try {
+            // 1. stop receiver
+            receiver.stop();
+            // 2. stop executor
+            consumer.stop(needCleanUp);
+        } finally {
+            unlock();
         }
+        logger.info("canal client has been stopped.");
     }
 
     public void registerChannels(List<SyncChannel> channels) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
index 4d68315..d4d8f71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/position/PositionMeta.java
@@ -38,6 +38,7 @@ public class PositionMeta<T> {
         this.batches = Maps.newHashMap();
         this.commitPositions = Maps.newHashMap();
     }
+
     public void addBatch(long batchId, PositionRange<T> range) {
         updateMaxBatchId(batchId);
         batches.put(batchId, range);
@@ -76,7 +77,7 @@ public class PositionMeta<T> {
     }
 
     public T getLatestPosition() {
-        if (batches.isEmpty()) {
+        if (!batches.containsKey(maxBatchId)) {
             return null;
         } else {
             return batches.get(maxBatchId).getEnd();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 85ddd1f..9715395 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -159,9 +159,10 @@ public class InsertStreamTxnExecutor {
             if (code != TStatusCode.OK) {
                 throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList());
             }
-            txnEntry.clearDataToSend();
         } catch (RpcException e) {
             throw new TException(e);
+        } finally {
+            txnEntry.clearDataToSend();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java
new file mode 100644
index 0000000..22d3a3d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutor.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SerialExecutor extends AbstractExecutorService {
+
+    private final ExecutorService taskPool;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition terminating = lock.newCondition();
+
+    private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
+    private Runnable active;
+
+    private boolean shutdown;
+
+    public SerialExecutor(final ExecutorService executor) {
+        Preconditions.checkNotNull(executor);
+        this.taskPool = executor;
+    }
+
+    public void execute(final Runnable r) {
+        lock.lock();
+        try {
+            checkPoolIsRunning();
+            tasks.add(new Runnable() {
+                public void run() {
+                    try {
+                        r.run();
+                    } finally {
+                        scheduleNext();
+                    }
+                }
+            });
+            if (active == null) {
+                scheduleNext();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void checkPoolIsRunning() {
+        Preconditions.checkState(lock.isHeldByCurrentThread());
+        if (shutdown) {
+            throw new RejectedExecutionException("SerialExecutor is already shutdown");
+        }
+    }
+
+    public void shutdown() {
+        lock.lock();
+        try {
+            shutdown = true;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public List<Runnable> shutdownNow() {
+        lock.lock();
+        try {
+            shutdown = true;
+            List<Runnable> result = new ArrayList<>();
+            tasks.drainTo(result);
+            return result;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isShutdown() {
+        lock.lock();
+        try {
+            return shutdown;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isTerminated() {
+        lock.lock();
+        try {
+            return shutdown && active == null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        lock.lock();
+        try {
+            long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+            long remainingTime;
+            while ((remainingTime = waitUntil - System.nanoTime()) > 0) {
+                if (shutdown && active == null) {
+                    break;
+                }
+                terminating.awaitNanos(remainingTime);
+            }
+            return remainingTime > 0;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void scheduleNext() {
+        lock.lock();
+        try {
+            if ((active = tasks.poll()) != null) {
+                taskPool.execute(active);
+            } else if (shutdown) {
+                terminating.signalAll();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java
new file mode 100644
index 0000000..dcdf269
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SerialExecutorService.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.common.ThreadPoolManager;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This executor service ensures that all tasks submitted to
+ * the same slot are executed in the order of submission.
+ */
+public class SerialExecutorService {
+
+    public interface SerialRunnable extends Runnable {
+        int getIndex();
+    }
+
+    private final int numOfSlots;
+    private final ExecutorService taskPool;
+    private final SerialExecutor[] slots;
+
+    private SerialExecutorService(int numOfSlots, ExecutorService taskPool) {
+        this.numOfSlots = numOfSlots;
+        this.slots = new SerialExecutor[numOfSlots];
+        this.taskPool = taskPool;
+        for (int i = 0; i < numOfSlots; i++) {
+            slots[i] = new SerialExecutor(taskPool);
+        }
+    }
+
+    public SerialExecutorService(int numOfSlots) {
+        this(numOfSlots, ThreadPoolManager.newDaemonFixedThreadPool(
+                numOfSlots, 256, "sync-task-pool", true));
+    }
+
+    public void submit(Runnable command) {
+        int index = getIndex(command);
+        if (isSlotIndex(index)) {
+            SerialExecutor serialEx = slots[index];
+            serialEx.execute(command);
+        } else {
+            taskPool.execute(command);
+        }
+    }
+
+    private int getIndex(Runnable command) {
+        int index = -1;
+        if (command instanceof SerialRunnable) {
+            index = (((SerialRunnable) command).getIndex());
+        }
+        return index;
+    }
+
+    private boolean isSlotIndex(int index) {
+        return index >= 0 && index < numOfSlots;
+    }
+
+    public void close() {
+        for (int i = 0; i < numOfSlots; i++) {
+            final SerialExecutor serialEx = slots[i];
+            serialEx.shutdown();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java
similarity index 95%
rename from fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java
rename to fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java
index e9b7695..c61d9e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncPendingTask.java
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.sync;
+package org.apache.doris.task;
 
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.sync.SyncFailMsg.MsgType;
+import org.apache.doris.load.sync.SyncJob;
 import org.apache.doris.load.sync.SyncJob.JobState;
-import org.apache.doris.task.MasterTask;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java
new file mode 100644
index 0000000..cdae68b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTask.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.load.sync.SyncChannelCallback;
+import org.apache.doris.task.SerialExecutorService.SerialRunnable;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * SyncTask is a runnable to submit to SerialExecutorService. Each
+ * SyncTask will have an index to submit to the corresponding slot
+ * in the SerialExecutorService. And SerialExecutorService ensures
+ * that all SyncTasks submitted with the same index are always
+ * executed in the order of submission.
+ */
+public abstract class SyncTask implements SerialRunnable {
+    private static final Logger LOG = LogManager.getLogger(SyncTask.class);
+
+    protected long signature;
+    /**
+     * Each index corresponds to a slot in the SerialExecutorService.
+     * It should only be assigned by the getNextIndex() method in the
+     * SyncTaskPool. SyncTasks with the same index are always executed
+     * in the order of submission.
+     */
+    protected int index;
+    protected SyncChannelCallback callback;
+
+    public SyncTask(long signature, int index, SyncChannelCallback callback) {
+        this.signature = signature;
+        this.index = index;
+        this.callback = callback;
+    }
+
+    @Override
+    public void run() {
+        try {
+            exec();
+        } catch (Exception e) {
+            String errMsg = "channel " + signature + ", " + "msg: " + e.getMessage();
+            LOG.error("sync task exec error: {}", errMsg);
+            callback.onFailed(errMsg);
+        }
+    }
+
+    public int getIndex() {
+        return this.index;
+    }
+
+    /**
+     * implement in child
+     */
+    protected abstract void exec() throws Exception;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java
new file mode 100644
index 0000000..fb2309d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/SyncTaskPool.java
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.common.Config;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntUnaryOperator;
+
+public class SyncTaskPool {
+    private static final int NUM_OF_SLOTS = Config.max_sync_task_threads_num;
+    private static final SerialExecutorService EXECUTOR = new SerialExecutorService(NUM_OF_SLOTS);
+    private static final AtomicInteger nextIndex = new AtomicInteger();
+
+    public static void submit(Runnable task) {
+        if (task == null) {
+            return;
+        }
+        EXECUTOR.submit(task);
+    }
+
+    /**
+     * Gets the next index loop from 0 to @NUM_OF_SLOTS - 1
+     */
+    public static int getNextIndex() {
+        return nextIndex.updateAndGet(new IntUnaryOperator() {
+            @Override
+            public int applyAsInt(int operand) {
+                if (++operand >= NUM_OF_SLOTS) {
+                    operand = 0;
+                }
+                return operand;
+            }
+        });
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 8a893c1..343a0ff 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -73,6 +73,7 @@ public class CanalSyncDataTest {
     private long offset = 0;
     private long nextId = 1000L;
     private int batchSize = 8192;
+    private long channelId = 100001L;
 
     ReentrantLock getLock;
 
@@ -220,13 +221,12 @@ public class CanalSyncDataTest {
         CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
                 syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
         CanalSyncChannel channel = new CanalSyncChannel(
-                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+                channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
 
         Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
         idToChannels.put(channel.getId(), channel);
         consumer.setChannels(idToChannels);
 
-        channel.start();
         consumer.start();
         receiver.start();
 
@@ -235,7 +235,6 @@ public class CanalSyncDataTest {
         } finally {
             receiver.stop();
             consumer.stop();
-            channel.stop();
         }
 
         Assert.assertEquals("position:N/A", consumer.getPositionInfo());
@@ -295,13 +294,12 @@ public class CanalSyncDataTest {
         CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
                 syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
         CanalSyncChannel channel = new CanalSyncChannel(
-                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+                channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
 
         Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
         idToChannels.put(channel.getId(), channel);
         consumer.setChannels(idToChannels);
 
-        channel.start();
         consumer.start();
         receiver.start();
 
@@ -310,7 +308,6 @@ public class CanalSyncDataTest {
         } finally {
             receiver.stop();
             consumer.stop();
-            channel.stop();
         }
 
         LOG.info(consumer.getPositionInfo());
@@ -360,13 +357,12 @@ public class CanalSyncDataTest {
         CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
                 syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
         CanalSyncChannel channel = new CanalSyncChannel(
-                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+                channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
 
         Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
         idToChannels.put(channel.getId(), channel);
         consumer.setChannels(idToChannels);
 
-        channel.start();
         consumer.start();
         receiver.start();
 
@@ -375,7 +371,6 @@ public class CanalSyncDataTest {
         } finally {
             receiver.stop();
             consumer.stop();
-            channel.stop();
         }
 
         Assert.assertEquals("position:N/A", consumer.getPositionInfo());
@@ -444,13 +439,12 @@ public class CanalSyncDataTest {
         CanalSyncDataReceiver receiver = new CanalSyncDataReceiver(
                 syncJob, connector, "test", "mysql_db.mysql_tbl", consumer, 8192, getLock);
         CanalSyncChannel channel = new CanalSyncChannel(
-                syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
+                channelId, syncJob, database, table, Lists.newArrayList("a", "b"), "mysql_db", "mysql_tbl");
 
         Map<Long, CanalSyncChannel> idToChannels = Maps.newHashMap();
         idToChannels.put(channel.getId(), channel);
         consumer.setChannels(idToChannels);
 
-        channel.start();
         consumer.start();
         receiver.start();
 
@@ -459,7 +453,6 @@ public class CanalSyncDataTest {
         } finally {
             receiver.stop();
             consumer.stop();
-            channel.stop();
         }
 
         Assert.assertEquals("position:N/A", consumer.getPositionInfo());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java
new file mode 100644
index 0000000..cc9f500
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/SerialExecutorServiceTest.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.load.sync.SyncChannelCallback;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SerialExecutorServiceTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MasterTaskExecutorTest.class);
+    private static final int NUM_OF_SLOTS = 10;
+    private static final int THREAD_NUM = 10;
+
+    private static SerialExecutorService taskPool;
+    // thread signature -> tasks submit serial
+    private static Map<Long, List<Integer>> submitSerial;
+    // thread signature -> tasks execute serial
+    private static Map<Long, List<Integer>> execSerial;
+
+    @Before
+    public void setUp() {
+        taskPool = new SerialExecutorService(NUM_OF_SLOTS);
+        submitSerial = new ConcurrentHashMap<>();
+        execSerial = new ConcurrentHashMap<>();
+    }
+
+    @After
+    public void tearDown() {
+        if (taskPool != null) {
+            taskPool.close();
+        }
+    }
+
+    @Test
+    public void testSubmit() {
+        for (long i = 0; i < THREAD_NUM; i++) {
+            if (!submitSerial.containsKey(i)) {
+                submitSerial.put(i, new ArrayList<>());
+            }
+            SubmitThread thread = new SubmitThread("Thread-" + i, i, submitSerial.get(i));
+            thread.start();
+        }
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+
+        // The submission order of the same signature should be equal to the execution order
+        Assert.assertEquals(submitSerial.size(), THREAD_NUM);
+        Assert.assertEquals(submitSerial.size(), execSerial.size());
+        for (long i = 0; i < THREAD_NUM; i++) {
+            Assert.assertTrue(submitSerial.containsKey(i));
+            Assert.assertTrue(execSerial.containsKey(i));
+            List<Integer> submitSerialList = submitSerial.get(i);
+            List<Integer> execSerialList = execSerial.get(i);
+            Assert.assertEquals(submitSerialList.size(), execSerialList.size());
+            for (int j = 0; j < submitSerialList.size(); j++) {
+                Assert.assertEquals(submitSerialList.get(j), execSerialList.get(j));
+            }
+        }
+    }
+
+    private static class TestSyncTask extends SyncTask {
+        public int serial;
+
+        public TestSyncTask(long signature, int index, int serial, SyncChannelCallback callback) {
+            super(signature, index, callback);
+            this.serial = serial;
+        }
+
+        @Override
+        protected void exec() {
+            LOG.info("run exec. signature: {}, index: {}, serial: {}", signature, index, serial);
+            if (!execSerial.containsKey(signature)) {
+                execSerial.put(signature, new ArrayList<>());
+            }
+            execSerial.get(signature).add(serial);
+        }
+    }
+
+    private static class SubmitThread extends Thread {
+        private int index = SyncTaskPool.getNextIndex();
+        private long signature;
+        private List<Integer> submitSerialList;
+
+        public SubmitThread(String name, long signature, List<Integer> submitSerialList) {
+            super(name);
+            this.signature = signature;
+            this.submitSerialList = submitSerialList;
+        }
+
+        public void run() {
+            for (int i = 0; i < 100; i++) {
+                TestSyncTask task = new TestSyncTask(signature, index, i, new SyncChannelCallback() {
+                    @Override
+                    public void onFinished(long channelId) {
+                    }
+                    @Override
+                    public void onFailed(String errMsg) {
+                    }
+                });
+                submitSerialList.add(i);
+                taskPool.submit(task);
+            }
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org