You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/11/13 03:23:02 UTC

[incubator-doris] branch master updated: Sending clear txn task explicitly after transaction being aborted (#2182)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 11872d5  Sending clear txn task explicitly after transaction being aborted (#2182)
11872d5 is described below

commit 11872d5cf60afa9a165cfd0f19a55f8863e3aaae
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Wed Nov 13 11:22:45 2019 +0800

    Sending clear txn task explicitly after transaction being aborted (#2182)
---
 be/src/agent/task_worker_pool.cpp                  |  9 ++++-
 be/src/common/config.h                             |  5 +++
 be/src/olap/delta_writer.cpp                       |  2 +-
 be/src/olap/olap_define.h                          |  1 +
 be/src/olap/push_handler.cpp                       |  9 +++--
 be/src/olap/storage_engine.cpp                     |  9 ++++-
 be/src/olap/storage_engine.h                       |  3 +-
 be/src/olap/txn_manager.cpp                        | 44 +++++++++++++++++++++-
 be/src/olap/txn_manager.h                          | 15 ++++++++
 be/test/olap/txn_manager_test.cpp                  |  1 +
 .../java/org/apache/doris/master/MasterImpl.java   | 12 ++----
 .../org/apache/doris/master/ReportHandler.java     |  4 +-
 .../java/org/apache/doris/task/ClearAlterTask.java |  9 -----
 .../apache/doris/task/ClearTransactionTask.java    | 25 +++---------
 .../doris/transaction/GlobalTransactionMgr.java    | 43 +++++++++++++++++++++
 15 files changed, 141 insertions(+), 50 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 9e51d17..dbb60fd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -893,8 +893,13 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
             // transaction_id should be greater than zero.
             // If it is not greater than zero, no need to execute
             // the following clear_transaction_task() function.
-            worker_pool_this->_env->storage_engine()->clear_transaction_task(
-                    clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
+            if (clear_transaction_task_req.partition_id.empty()) {
+                worker_pool_this->_env->storage_engine()->clear_transaction_task(
+                        clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
+            } else {
+                worker_pool_this->_env->storage_engine()->clear_transaction_task(
+                        clear_transaction_task_req.transaction_id);
+            }
             LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature
                       << ", transaction_id:" << clear_transaction_task_req.transaction_id;
         } else {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9aa7d2b..e335000 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -491,6 +491,11 @@ namespace config {
 
     // brpc config
     CONF_Int64(brpc_max_body_size, "67108864")
+
+    // max number of txns in txn manager
+    // this is a self protection to avoid too many txns saving in manager
+    CONF_Int64(max_runnings_transactions, "1000");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 711d50c..57179b7 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -123,7 +123,7 @@ OLAPStatus DeltaWriter::init() {
                 if (!new_migration_rlock.own_lock()) {
                     return OLAP_ERR_RWLOCK_ERROR;
                 }
-                _storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id);
+                RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id));
             }
         }
     }
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index f4b40f1..f1efdb0 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -166,6 +166,7 @@ enum OLAPStatus {
     OLAP_ERR_VERSION_ALREADY_MERGED = -230,
     OLAP_ERR_LZO_DISABLED = -231,
     OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232,
+    OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
 
     // CommandExecutor
     // [-300, -400)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 298634d..614b366 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -93,8 +93,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
   PUniqueId load_id;
   load_id.set_hi(0);
   load_id.set_lo(0);
-  OLAPStatus res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
-      tablet, request.transaction_id, load_id);
+  RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
+      tablet, request.transaction_id, load_id));
 
   // prepare txn will be always successful
   // if current tablet is under schema change, origin tablet is successful and
@@ -140,8 +140,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
         PUniqueId load_id;
         load_id.set_hi(0);
         load_id.set_lo(0);
-        res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
-            related_tablet, request.transaction_id, load_id);
+        RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
+            related_tablet, request.transaction_id, load_id));
         // prepare txn will always be successful
         tablet_vars->push_back(TabletVars());
         TabletVars& new_item = tablet_vars->back();
@@ -158,6 +158,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
   // not call validate request here, because realtime load does not
   // contain version info
 
+  OLAPStatus res;
   // check delete condition if push for delete
   std::queue<DeletePredicatePB> del_preds;
   if (push_type == PUSH_FOR_DELETE) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 99cb877..4479451 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -477,8 +477,15 @@ OLAPStatus StorageEngine::clear() {
     return OLAP_SUCCESS;
 }
 
+void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) {
+    // clear transaction task may not contains partitions ids, we should get partition id from txn manager.
+    std::vector<int64_t> partition_ids;
+    StorageEngine::instance()->txn_manager()->get_partition_ids(transaction_id, &partition_ids);
+    clear_transaction_task(transaction_id, partition_ids);
+}
+
 void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
-                                        const vector<TPartitionId> partition_ids) {
+        const vector<TPartitionId>& partition_ids) {
     LOG(INFO) << "begin to clear transaction task. transaction_id=" <<  transaction_id;
 
     for (const TPartitionId& partition_id : partition_ids) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c6634b0..a360f22 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -86,8 +86,9 @@ public:
                                   const bool is_schema_change_tablet,
                                   const TabletSharedPtr ref_tablet);
 
+    void clear_transaction_task(const TTransactionId transaction_id);
     void clear_transaction_task(const TTransactionId transaction_id,
-                                const std::vector<TPartitionId> partition_ids);
+                                const std::vector<TPartitionId>& partition_ids);
 
     // Instance should be inited from create_instance
     // MUST NOT be called in other circumstances.
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index e103b27..44fe9b2 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -134,11 +134,21 @@ OLAPStatus TxnManager::prepare_txn(
             }
         }
     }
+
+    // check if there are too many transactions on running.
+    // if yes, reject the request.
+    if (_txn_partition_map.size() > config::max_runnings_transactions) {
+        LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions;
+        return OLAP_ERR_TOO_MANY_TRANSACTIONS;
+    }
+
     // not found load id
     // case 1: user start a new txn, rowset_ptr = null
     // case 2: loading txn from meta env
     TabletTxnInfo load_info(load_id, nullptr);
     _txn_tablet_map[key][tablet_info] = load_info;
+    _insert_txn_partition_map_unlocked(transaction_id, partition_id);
+
     LOG(INFO) << "add transaction to engine successfully."
             << "partition_id: " << key.first
             << ", transaction_id: " << key.second
@@ -225,6 +235,7 @@ OLAPStatus TxnManager::commit_txn(
         WriteLock wrlock(&_txn_map_lock);
         TabletTxnInfo load_info(load_id, rowset_ptr);
         _txn_tablet_map[key][tablet_info] = load_info;
+        _insert_txn_partition_map_unlocked(transaction_id, partition_id);
         LOG(INFO) << "commit transaction to engine successfully."
                 << " partition_id: " << key.first
                 << ", transaction_id: " << key.second
@@ -289,6 +300,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
                 _txn_tablet_map.erase(it);
             }
         }
+        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
         return OLAP_SUCCESS;
     }
 }
@@ -324,7 +336,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
         if (it->second.empty()) {
             _txn_tablet_map.erase(it);
         }
-        return OLAP_SUCCESS;
+        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     }
     return OLAP_SUCCESS;
 }
@@ -374,6 +386,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
     if (it->second.empty()) {
         _txn_tablet_map.erase(it);
     }
+    _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     return OLAP_SUCCESS;
 }
 
@@ -423,6 +436,7 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
         if (it.second.empty()) {
             _txn_tablet_map.erase(it.first);
         }
+        _clear_txn_partition_map_unlocked(it.first.second, it.first.first);
     }
 }
 
@@ -500,4 +514,32 @@ bool TxnManager::get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, Ta
     return true;
 }
 
+void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) {
+    ReadLock txn_rdlock(&_txn_map_lock);
+    auto it = _txn_partition_map.find(transaction_id);
+    if (it != _txn_partition_map.end()) {
+        for (int64_t partition_id : it->second) {
+            partition_ids->push_back(partition_id);
+        }
+    }
+}
+
+void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
+    auto find = _txn_partition_map.find(transaction_id);
+    if (find == _txn_partition_map.end()) {
+        _txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
+    }
+    _txn_partition_map[transaction_id].insert(partition_id);
+}
+
+void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
+    auto it = _txn_partition_map.find(transaction_id);
+    if (it != _txn_partition_map.end()) {
+        it->second.erase(partition_id);
+        if (it->second.empty()) {
+            _txn_partition_map.erase(it);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index ef7f46e..8ea816e 100755
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -27,6 +27,8 @@
 #include <string>
 #include <vector>
 #include <thread>
+#include <unordered_map>
+#include <unordered_set>
 
 #include <rapidjson/document.h>
 #include <pthread.h>
@@ -70,6 +72,7 @@ public:
 
     ~TxnManager() {
         _txn_tablet_map.clear();
+        _txn_partition_map.clear();
         _txn_locks.clear();
     }
 
@@ -132,16 +135,28 @@ public:
     bool get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, std::vector<int64_t>* transaction_ids);
 
     void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);
+
+    void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids);
     
 private:
     RWMutex* _get_txn_lock(TTransactionId txn_id) {
         return _txn_locks[txn_id % _txn_lock_num].get();
     }
 
+    // insert or remove (transaction_id, partition_id) from _txn_partition_map
+    // get _txn_map_lock before calling
+    void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
+    void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
+
 private:
     RWMutex _txn_map_lock;
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
     std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map;
+    // transaction_id -> corresponding partition ids
+    // This is mainly for the clear txn task received from FE, which may only has transaction id,
+    // so we need this map to find out which partitions are corresponding to a transaction id.
+    // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map'
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map;
 
     const int32_t _txn_lock_num = 100;
     std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks;
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index be11224..0c93e5a 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -92,6 +92,7 @@ public:
     }
 
     virtual void SetUp() {
+        config::max_runnings_transactions = 1000;
         
         std::vector<StorePath> paths;
         paths.emplace_back("_engine_data_path", -1);
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index 1393fd6..db92f77 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -558,18 +558,14 @@ public class MasterImpl {
     
     private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) {
         ClearAlterTask clearAlterTask = (ClearAlterTask) task;
-        clearAlterTask.setFinished();
-        AgentTaskQueue.removeTask(task.getBackendId(), 
-                task.getTaskType(), 
-                task.getSignature());
+        clearAlterTask.setFinished(true);
+        AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
     }
     
     private void finishClearTransactionTask(AgentTask task, TFinishTaskRequest request) {
         ClearTransactionTask clearTransactionTask = (ClearTransactionTask) task;
-        clearTransactionTask.setFinished();
-        AgentTaskQueue.removeTask(task.getBackendId(), 
-                task.getTaskType(), 
-                task.getSignature());
+        clearTransactionTask.setFinished(true);
+        AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
     }
     
     private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index 2a40961..213da10 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -81,7 +81,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -896,8 +895,7 @@ public class ReportHandler extends Daemon {
         AgentBatchTask batchTask = new AgentBatchTask();
         for (Long transactionId : transactionsToClear.keySet()) {
             ClearTransactionTask clearTransactionTask = new ClearTransactionTask(backendId, 
-                    transactionId, 
-                    new ArrayList<Long>(transactionsToClear.get(transactionId)));
+                    transactionId, transactionsToClear.get(transactionId));
             batchTask.addTask(clearTransactionTask);
             AgentTaskQueue.addTask(clearTransactionTask);
         }
diff --git a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
index 53e8015..1417861 100644
--- a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
+++ b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
@@ -22,7 +22,6 @@ import org.apache.doris.thrift.TTaskType;
 
 public class ClearAlterTask extends AgentTask {
     private int schemaHash;
-    private boolean isFinished;
 
     public ClearAlterTask(long backendId, long dbId, long tableId, long partitionId, long indexId,
                             long tabletId, int schemaHash) {
@@ -40,12 +39,4 @@ public class ClearAlterTask extends AgentTask {
     public int getSchemaHash() {
         return schemaHash;
     }
-    
-    public void setFinished() {
-        this.isFinished = true;
-    }
-    
-    public boolean isFinished() {
-        return isFinished;
-    }
 }
diff --git a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
index 7b1caea..6f07898 100644
--- a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
+++ b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
@@ -17,24 +17,17 @@
 
 package org.apache.doris.task;
 
-import java.util.List;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import org.apache.doris.thrift.TClearTransactionTaskRequest;
 import org.apache.doris.thrift.TTaskType;
 
-public class ClearTransactionTask extends AgentTask {
+import java.util.List;
 
-    private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class);
+public class ClearTransactionTask extends AgentTask {
 
     private long transactionId;
     private List<Long> partitionIds;
-    private boolean isFinished;
 
-    public ClearTransactionTask(long backendId, long transactionId, 
-            List<Long> partitionIds) {
+    public ClearTransactionTask(long backendId, long transactionId, List<Long> partitionIds) {
         super(null, backendId, TTaskType.CLEAR_TRANSACTION_TASK, -1L, -1L, -1L, -1L, -1L, transactionId);
         this.transactionId = transactionId;
         this.partitionIds = partitionIds;
@@ -42,16 +35,8 @@ public class ClearTransactionTask extends AgentTask {
     }
     
     public TClearTransactionTaskRequest toThrift() {
-        TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest(transactionId, 
-                partitionIds);
+        TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest(
+            transactionId, partitionIds);
         return clearTransactionTaskRequest;
     }
-    
-    public void setFinished() {
-        this.isFinished = true;
-    }
-    
-    public boolean isFinished() {
-        return this.isFinished;
-    }
 }
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 5908d18..15d3e19 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -39,7 +39,10 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.persist.EditLog;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
 import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
@@ -95,6 +98,8 @@ public class GlobalTransactionMgr {
     
     private Catalog catalog;
 
+    private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
     public GlobalTransactionMgr(Catalog catalog) {
         this.catalog = catalog;
     }
@@ -484,6 +489,7 @@ public class GlobalTransactionMgr {
         return transactionState.getTransactionStatus() == TransactionStatus.VISIBLE;
     }
 
+    // for http cancel stream load api
     public void abortTransaction(Long dbId, String label, String reason) throws UserException {
         Preconditions.checkNotNull(label);
         Long transactionId = null;
@@ -532,9 +538,46 @@ public class GlobalTransactionMgr {
             writeUnlock();
             transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
         }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+
         return;
     }
     
+    private void clearBackendTransactions(TransactionState transactionState) {
+        Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+        // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+        // to all backends.
+        List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        AgentBatchTask batchTask = null;
+        synchronized (clearTransactionTasks) {
+            for (Long beId : allBeIds) {
+                ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+                clearTransactionTasks.add(task);
+            }
+
+            // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+            if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+                batchTask = new AgentBatchTask();
+                for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+                    batchTask.addTask(clearTransactionTask);
+                }
+                clearTransactionTasks.clear();
+            }
+        }
+
+        if (batchTask != null) {
+            AgentTaskExecutor.submit(batchTask);
+        }
+    }
+
     /*
      * get all txns which is ready to publish
      * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org