You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/09 14:35:25 UTC

[incubator-doris] branch master updated: [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5703ef  [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222)
a5703ef is described below

commit a5703ef114ee5be2657481f6393e2bca054fd48f
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Apr 9 09:35:15 2020 -0500

    [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222)
    
    This PR is to enhance the performance for txn manage task, when there are so many txn in
    BE, the only one txn_map_lock and additional _txn_locks may cause poor performance, and
    now we remove the additional _txn_locks and split the txn_map_lock into many small locks.
---
 be/src/common/config.h                        |  12 +-
 be/src/olap/storage_engine.cpp                |   2 +-
 be/src/olap/tablet_manager.cpp                |   1 +
 be/src/olap/tablet_manager.h                  |   2 +-
 be/src/olap/txn_manager.cpp                   | 236 ++++++++++++++------------
 be/src/olap/txn_manager.h                     |  75 ++++++--
 be/test/olap/olap_snapshot_converter_test.cpp |   1 -
 be/test/olap/tablet_mgr_test.cpp              |   6 +-
 be/test/olap/txn_manager_test.cpp             |  48 +++---
 9 files changed, 231 insertions(+), 152 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 93a0bff..9b06918 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -491,9 +491,9 @@ namespace config {
     // brpc config, 200M
     CONF_Int64(brpc_max_body_size, "209715200")
 
-    // max number of txns in txn manager
+    // max number of txns for every txn_partition_map in txn manager
     // this is a self protection to avoid too many txns saving in manager
-    CONF_mInt64(max_runnings_transactions, "2000");
+    CONF_mInt64(max_runnings_transactions_per_txn_map, "100");
 
     // tablet_map_lock shard size, the value is 2^n, n=0,1,2,3,4
     // this is a an enhancement for better performance to manage tablet
@@ -501,6 +501,14 @@ namespace config {
 
     CONF_String(plugin_path, "${DORIS_HOME}/plugin")
 
+    // txn_map_lock shard size, the value is 2^n, n=0,1,2,3,4
+    // this is a an enhancement for better performance to manage txn
+    CONF_Int32(txn_map_shard_size, "128");
+
+    // txn_lock shard size, the value is 2^n, n=0,1,2,3,4
+    // this is a an enhancement for better performance to publish txn
+    CONF_Int32(txn_shard_size, "1024")
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 16b9025..a8c8967 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -114,7 +114,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
         _is_all_cluster_id_exist(true),
         _index_stream_lru_cache(NULL),
         _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
-        _txn_manager(new TxnManager()),
+        _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
         _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
         _memtable_flush_executor(nullptr),
         _block_manager(nullptr),
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 793fd45..c31ec5f 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -70,6 +70,7 @@ TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
     : _tablet_map_lock_shard_size(tablet_map_lock_shard_size),
       _last_update_stat_ms(0) {
     DCHECK_GT(_tablet_map_lock_shard_size, 0);
+    DCHECK_EQ(_tablet_map_lock_shard_size & (tablet_map_lock_shard_size - 1), 0);
     _tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size];
     _tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size];
 }
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 263b14b..b47be43 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -193,7 +193,7 @@ private:
     // tablet_id -> TabletInstances
     typedef std::unordered_map<int64_t, TableInstances> tablet_map_t;
 
-    int32_t _tablet_map_lock_shard_size;
+    const int32_t _tablet_map_lock_shard_size;
     // _tablet_map_lock_array[i] protect _tablet_map_array[i], i=0,1,2...,and i < _tablet_map_lock_shard_size
     RWMutex *_tablet_map_lock_array;
     tablet_map_t *_tablet_map_array;
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index c098c5b..8fdec4d 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/storage_engine.h"
+#include "txn_manager.h"
 
 #include <signal.h>
 
@@ -36,6 +36,7 @@
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/lru_cache.h"
+#include "olap/storage_engine.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/push_handler.h"
@@ -70,10 +71,17 @@ using std::vector;
 
 namespace doris {
 
-TxnManager::TxnManager() {
-    for (int i = 0; i < _txn_lock_num; ++i) {
-        _txn_locks[i] = std::make_shared<RWMutex>();
-    }
+TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size)
+        : _txn_map_shard_size(txn_map_shard_size),
+          _txn_shard_size(txn_shard_size) {
+    DCHECK_GT(_txn_map_shard_size, 0);
+    DCHECK_GT(_txn_shard_size, 0);
+    DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
+    DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
+    _txn_map_locks = new RWMutex[_txn_map_shard_size];
+    _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
+    _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
+    _txn_mutex = new Mutex[_txn_shard_size];
 }
 
 OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, 
@@ -111,12 +119,12 @@ OLAPStatus TxnManager::prepare_txn(
     TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, 
     const PUniqueId& load_id) {
 
-    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TxnKey key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    WriteLock wrlock(_get_txn_lock(transaction_id));
-    WriteLock txn_wrlock(&_txn_map_lock);
-    auto it = _txn_tablet_map.find(key);
-    if (it != _txn_tablet_map.end()) {
+    WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id));
+    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it != txn_tablet_map.end()) {
         auto load_itr = it->second.find(tablet_info);
         if (load_itr != it->second.end()) {
             // found load for txn,tablet
@@ -137,8 +145,9 @@ OLAPStatus TxnManager::prepare_txn(
 
     // check if there are too many transactions on running.
     // if yes, reject the request.
-    if (_txn_partition_map.size() > config::max_runnings_transactions) {
-        LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions;
+    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+    if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
+        LOG(WARNING) << "too many transactions: " << txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions_per_txn_map;
         return OLAP_ERR_TOO_MANY_TRANSACTIONS;
     }
 
@@ -146,7 +155,7 @@ OLAPStatus TxnManager::prepare_txn(
     // case 1: user start a new txn, rowset_ptr = null
     // case 2: loading txn from meta env
     TabletTxnInfo load_info(load_id, nullptr);
-    _txn_tablet_map[key][tablet_info] = load_info;
+    txn_tablet_map[key][tablet_info] = load_info;
     _insert_txn_partition_map_unlocked(transaction_id, partition_id);
 
     VLOG(3) << "add transaction to engine successfully."
@@ -175,12 +184,13 @@ OLAPStatus TxnManager::commit_txn(
                      << ", tablet: " << tablet_info.to_string();
         return OLAP_ERR_ROWSET_INVALID;
     }
-    WriteLock wrlock(_get_txn_lock(transaction_id));
+
     {
         // get tx
-        ReadLock rdlock(&_txn_map_lock);
-        auto it = _txn_tablet_map.find(key);
-        if (it != _txn_tablet_map.end()) {
+        ReadLock rdlock(&_get_txn_map_lock(transaction_id));
+        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+        auto it = txn_tablet_map.find(key);
+        if (it != txn_tablet_map.end()) {
             auto load_itr = it->second.find(tablet_info);
             if (load_itr != it->second.end()) {
                 // found load for txn,tablet
@@ -232,9 +242,10 @@ OLAPStatus TxnManager::commit_txn(
     }
 
     {
-        WriteLock wrlock(&_txn_map_lock);
+        WriteLock wrlock(&_get_txn_map_lock(transaction_id));
         TabletTxnInfo load_info(load_id, rowset_ptr);
-        _txn_tablet_map[key][tablet_info] = load_info;
+        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+        txn_tablet_map[key][tablet_info] = load_info;
         _insert_txn_partition_map_unlocked(transaction_id, partition_id);
         LOG(INFO) << "commit transaction to engine successfully."
                 << " partition_id: " << key.first
@@ -253,11 +264,12 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
     RowsetSharedPtr rowset_ptr = nullptr;
-    WriteLock wrlock(_get_txn_lock(transaction_id));
+    MutexLock txn_lock(&_get_txn_lock(transaction_id));
     {
-        ReadLock rlock(&_txn_map_lock);
-        auto it = _txn_tablet_map.find(key);
-        if (it != _txn_tablet_map.end()) {
+        ReadLock rlock(&_get_txn_map_lock(transaction_id));
+        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+        auto it = txn_tablet_map.find(key);
+        if (it != txn_tablet_map.end()) {
             auto load_itr = it->second.find(tablet_info);
             if (load_itr != it->second.end()) {
                 // found load for txn,tablet
@@ -287,9 +299,10 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
         return OLAP_ERR_TRANSACTION_NOT_EXIST;
     }
     {
-        WriteLock wrlock(&_txn_map_lock);
-        auto it = _txn_tablet_map.find(key);
-        if (it != _txn_tablet_map.end()) {
+        WriteLock wrlock(&_get_txn_map_lock(transaction_id));
+        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+        auto it = txn_tablet_map.find(key);
+        if (it != txn_tablet_map.end()) {
             it->second.erase(tablet_info);
             LOG(INFO) << "publish txn successfully."
                       << " partition_id: " << key.first
@@ -297,7 +310,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
                       << ", tablet: " << tablet_info.to_string()
                       << ", rowsetid: " << rowset_ptr->rowset_id();
             if (it->second.empty()) {
-                _txn_tablet_map.erase(it);
+                txn_tablet_map.erase(it);
                 _clear_txn_partition_map_unlocked(transaction_id, partition_id);
             }
         }
@@ -313,10 +326,10 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
                                     TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    WriteLock wrlock(_get_txn_lock(transaction_id));
-    WriteLock txn_wrlock(&_txn_map_lock);
-    auto it = _txn_tablet_map.find(key);
-    if (it != _txn_tablet_map.end()) {
+    WriteLock wrlock(&_get_txn_map_lock(transaction_id));
+    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it != txn_tablet_map.end()) {
         auto load_itr = it->second.find(tablet_info);
         if (load_itr != it->second.end()) {
             // found load for txn,tablet
@@ -334,7 +347,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
                   << ", transaction_id: " << key.second
                   << ", tablet: " << tablet_info.to_string();
         if (it->second.empty()) {
-            _txn_tablet_map.erase(it);
+            txn_tablet_map.erase(it);
             _clear_txn_partition_map_unlocked(transaction_id, partition_id);
         }
     }
@@ -347,10 +360,10 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
                                   TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    WriteLock wrlock(_get_txn_lock(transaction_id));
-    WriteLock txn_wrlock(&_txn_map_lock);
-    auto it = _txn_tablet_map.find(key);
-    if (it == _txn_tablet_map.end()) {
+    WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id));
+    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it == txn_tablet_map.end()) {
         return OLAP_ERR_TRANSACTION_NOT_EXIST;
     }
     auto load_itr = it->second.find(tablet_info);
@@ -384,7 +397,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
     }
     it->second.erase(tablet_info);
     if (it->second.empty()) {
-        _txn_tablet_map.erase(it);
+        txn_tablet_map.erase(it);
         _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     }
     return OLAP_SUCCESS;
@@ -398,15 +411,18 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
     }
 
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    ReadLock txn_rdlock(&_txn_map_lock);
-    for (auto& it : _txn_tablet_map) {
-        if (it.second.find(tablet_info) != it.second.end()) {
-            *partition_id = it.first.first;
-            transaction_ids->insert(it.first.second);
-            VLOG(3) << "find transaction on tablet."
-                    << "partition_id: " << it.first.first
-                    << ", transaction_id: " << it.first.second
-                    << ", tablet: " << tablet_info.to_string();
+    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+        ReadLock txn_rdlock(&_txn_map_locks[i]);
+        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
+        for (auto& it : txn_tablet_map) {
+            if (it.second.find(tablet_info) != it.second.end()) {
+                *partition_id = it.first.first;
+                transaction_ids->insert(it.first.second);
+                VLOG(3) << "find transaction on tablet."
+                        << "partition_id: " << it.first.first
+                        << ", transaction_id: " << it.first.second
+                        << ", tablet: " << tablet_info.to_string();
+            }
         }
     }
 }
@@ -415,33 +431,36 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
 // maybe lock error, because not get txn lock before remove from meta
 void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    WriteLock txn_wrlock(&_txn_map_lock);
-    for (auto it = _txn_tablet_map.begin(); it != _txn_tablet_map.end();) {
-        auto load_itr = it->second.find(tablet_info);
-        if (load_itr != it->second.end()) {
-            TabletTxnInfo& load_info = load_itr->second;
-            if (load_info.rowset != nullptr && meta != nullptr) {
-                LOG(INFO) << " delete transaction from engine "
-                          << ", tablet: " << tablet_info.to_string()
-                          << ", rowset id: " << load_info.rowset->rowset_id();
-                RowsetMetaManager::remove(meta, tablet_uid,
-                                          load_info.rowset->rowset_id());
-            }
-            LOG(INFO) << "remove tablet related txn."
-                      << " partition_id: " << it->first.first
-                      << ", transaction_id: " << it->first.second
-                      << ", tablet: " << tablet_info.to_string() << ", rowset: "
-                      << (load_info.rowset != nullptr
+    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+        WriteLock txn_wrlock(&_txn_map_locks[i]);
+        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
+        for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
+            auto load_itr = it->second.find(tablet_info);
+            if (load_itr != it->second.end()) {
+                TabletTxnInfo& load_info = load_itr->second;
+                if (load_info.rowset != nullptr && meta != nullptr) {
+                    LOG(INFO) << " delete transaction from engine "
+                              << ", tablet: " << tablet_info.to_string()
+                              << ", rowset id: " << load_info.rowset->rowset_id();
+                    RowsetMetaManager::remove(meta, tablet_uid,
+                                              load_info.rowset->rowset_id());
+                }
+                LOG(INFO) << "remove tablet related txn."
+                          << " partition_id: " << it->first.first
+                          << ", transaction_id: " << it->first.second
+                          << ", tablet: " << tablet_info.to_string() << ", rowset: "
+                          << (load_info.rowset != nullptr
                               ? load_info.rowset->rowset_id().to_string()
                               : "0");
-            it->second.erase(tablet_info);
-        }
-        if (it->second.empty()) {
-            _clear_txn_partition_map_unlocked(it->first.second,
-                                              it->first.first);
-            it = _txn_tablet_map.erase(it);
-        } else {
-            ++it;
+                it->second.erase(tablet_info);
+            }
+            if (it->second.empty()) {
+                _clear_txn_partition_map_unlocked(it->first.second,
+                                                  it->first.first);
+                it = txn_tablet_map.erase(it);
+            } else {
+                ++it;
+            }
         }
     }
 }
@@ -451,10 +470,10 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
                                          std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
     // get tablets in this transaction
     pair<int64_t, int64_t> key(partition_id, transaction_id);
-    ReadLock rdlock(_get_txn_lock(transaction_id));
-    ReadLock txn_rdlock(&_txn_map_lock);
-    auto it = _txn_tablet_map.find(key);
-    if (it == _txn_tablet_map.end()) {
+    ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it == txn_tablet_map.end()) {
         VLOG(3) << "could not find tablet for"
                 << " partition_id=" << partition_id 
                 << ", transaction_id=" << transaction_id;
@@ -472,10 +491,12 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
 }
 
 void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
-    ReadLock txn_rdlock(&_txn_map_lock);
-    for (auto& it : _txn_tablet_map) {
-        for (auto& tablet_load_it : it.second) {
-            tablet_infos->emplace(tablet_load_it.first);
+    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+        ReadLock txn_rdlock(&_txn_map_locks[i]);
+        for (auto& it : _txn_tablet_maps[i]) {
+            for (auto& tablet_load_it : it.second) {
+                tablet_infos->emplace(tablet_load_it.first);
+            }
         }
     }
 }                                
@@ -484,10 +505,10 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
                          TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
-    ReadLock rdlock(_get_txn_lock(transaction_id));
-    ReadLock txn_rdlock(&_txn_map_lock);
-    auto it = _txn_tablet_map.find(key);
-    bool found = it != _txn_tablet_map.end()
+    ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    bool found = it != txn_tablet_map.end()
                  && it->second.find(tablet_info) != it->second.end();
 
     return found;
@@ -496,18 +517,20 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
 void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
     int64_t now = UnixSeconds();
     // traverse the txn map, and get all expired txns
-    ReadLock txn_rdlock(&_txn_map_lock);
-    for (auto& it : _txn_tablet_map) {
-        auto txn_id = it.first.second;
-        for (auto& t_map : it.second) {
-            double diff = difftime(now, t_map.second.creation_time);
-            if (diff >= config::pending_data_expire_time_sec) {
-                (*expire_txn_map)[t_map.first].push_back(txn_id);
-                if (VLOG_IS_ON(3)) {
-                    VLOG(3) << "find expired txn."
-                            << " tablet=" << t_map.first.to_string()
-                            << " transaction_id=" << txn_id
-                            << " exist_sec=" << diff;
+    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+        ReadLock txn_rdlock(&_txn_map_locks[i]);
+        for (auto& it : _txn_tablet_maps[i]) {
+            auto txn_id = it.first.second;
+            for (auto& t_map : it.second) {
+                double diff = difftime(now, t_map.second.creation_time);
+                if (diff >= config::pending_data_expire_time_sec) {
+                    (*expire_txn_map)[t_map.first].push_back(txn_id);
+                    if (VLOG_IS_ON(3)) {
+                        VLOG(3) << "find expired txn."
+                                << " tablet=" << t_map.first.to_string()
+                                << " transaction_id=" << txn_id
+                                << " exist_sec=" << diff;
+                    }
                 }
             }
         }
@@ -515,9 +538,10 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
 }
 
 void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) {
-    ReadLock txn_rdlock(&_txn_map_lock);
-    auto it = _txn_partition_map.find(transaction_id);
-    if (it != _txn_partition_map.end()) {
+    ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+    auto it = txn_partition_map.find(transaction_id);
+    if (it != txn_partition_map.end()) {
         for (int64_t partition_id : it->second) {
             partition_ids->push_back(partition_id);
         }
@@ -525,19 +549,21 @@ void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vec
 }
 
 void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
-    auto find = _txn_partition_map.find(transaction_id);
-    if (find == _txn_partition_map.end()) {
-        _txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
+    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+    auto find = txn_partition_map.find(transaction_id);
+    if (find == txn_partition_map.end()) {
+        txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
     }
-    _txn_partition_map[transaction_id].insert(partition_id);
+    txn_partition_map[transaction_id].insert(partition_id);
 }
 
 void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
-    auto it = _txn_partition_map.find(transaction_id);
-    if (it != _txn_partition_map.end()) {
+    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+    auto it = txn_partition_map.find(transaction_id);
+    if (it != txn_partition_map.end()) {
         it->second.erase(partition_id);
         if (it->second.empty()) {
-            _txn_partition_map.erase(it);
+            txn_partition_map.erase(it);
         }
     }
 }
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 96736d0..75ad152 100755
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -64,15 +64,17 @@ struct TabletTxnInfo {
     TabletTxnInfo() {}
 };
 
+
 // txn manager is used to manage mapping between tablet and txns
 class TxnManager {
 public:
-    TxnManager();
+    TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size);
 
     ~TxnManager() {
-        _txn_tablet_map.clear();
-        _txn_partition_map.clear();
-        _txn_locks.clear();
+        delete [] _txn_tablet_maps;
+        delete [] _txn_partition_maps;
+        delete [] _txn_map_locks;
+        delete [] _txn_mutex;
     }
 
     OLAPStatus prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, 
@@ -140,9 +142,35 @@ public:
     void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids);
     
 private:
-    RWMutex* _get_txn_lock(TTransactionId txn_id) {
-        return _txn_locks[txn_id % _txn_lock_num].get();
-    }
+
+    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
+
+    // implement TxnKey hash function to support TxnKey as a key for unordered_map
+    struct TxnKeyHash {
+        template<typename T, typename U>
+        size_t operator()(const std::pair<T, U> &e) const {
+            return std::hash<T>()(e.first) ^ std::hash<U>()(e.second);
+        }
+    };
+
+    // implement TxnKey equal function to support TxnKey as a key for unordered_map
+    struct TxnKeyEqual {
+        template <class T, typename U>
+        bool operator()(const std::pair<T, U> &l, const std::pair<T, U> &r) const{
+            return l.first == r.first && l.second == r.second;
+        }
+    };
+
+    typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, TxnKeyHash, TxnKeyEqual> txn_tablet_map_t;
+    typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> txn_partition_map_t;
+
+    inline RWMutex& _get_txn_map_lock(TTransactionId transactionId);
+
+    inline txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId);
+
+    inline txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
+
+    inline Mutex& _get_txn_lock(TTransactionId transactionId);
 
     // insert or remove (transaction_id, partition_id) from _txn_partition_map
     // get _txn_map_lock before calling
@@ -150,20 +178,39 @@ private:
     void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
 
 private:
-    RWMutex _txn_map_lock;
-    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
-    std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map;
+    const int32_t _txn_map_shard_size;
+
+    const int32_t _txn_shard_size;
+
+    // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size
+    txn_tablet_map_t *_txn_tablet_maps;
     // transaction_id -> corresponding partition ids
     // This is mainly for the clear txn task received from FE, which may only has transaction id,
     // so we need this map to find out which partitions are corresponding to a transaction id.
-    // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map'
-    std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map;
+    // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]'
+    txn_partition_map_t *_txn_partition_maps;
 
-    const int32_t _txn_lock_num = 100;
-    std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks;
+    RWMutex *_txn_map_locks;
 
+    Mutex *_txn_mutex;
     DISALLOW_COPY_AND_ASSIGN(TxnManager);
 };  // TxnManager
 
+inline RWMutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) {
+    return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) {
+    return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(TTransactionId transactionId) {
+    return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline Mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
+    return _txn_mutex[transactionId & (_txn_shard_size - 1)];
+}
+
 }
 #endif // DORIS_BE_SRC_OLAP_TXN_MANAGER_H
diff --git a/be/test/olap/olap_snapshot_converter_test.cpp b/be/test/olap/olap_snapshot_converter_test.cpp
index 3bcabf6..f554499 100644
--- a/be/test/olap/olap_snapshot_converter_test.cpp
+++ b/be/test/olap/olap_snapshot_converter_test.cpp
@@ -107,7 +107,6 @@ private:
     DataDir* _data_dir;
     OlapMeta* _meta;
     std::string _json_rowset_meta;
-    TxnManager _txn_mgr;
     std::string _engine_data_path;
     std::string _meta_path;
     int64_t _tablet_id;
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 44e8593..d78fe60 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -78,7 +78,7 @@ public:
                 + "/" + std::to_string(0)
                 + "/" + std::to_string(_tablet_id)
                 + "/" + std::to_string(_schema_hash);
-        _tablet_mgr = new TabletManager(1);
+        _tablet_mgr.reset(new TabletManager(1));
     }
 
     virtual void TearDown() {
@@ -86,18 +86,16 @@ public:
         if (boost::filesystem::exists(_engine_data_path)) {
             ASSERT_TRUE(boost::filesystem::remove_all(_engine_data_path));
         }
-        delete _tablet_mgr;
     }
 
 private:
     DataDir* _data_dir;
     std::string _json_rowset_meta;
-    TxnManager _txn_mgr;
     std::string _engine_data_path;
     int64_t _tablet_id;
     int32_t _schema_hash;
     string _tablet_data_path;
-    TabletManager* _tablet_mgr;
+    std::unique_ptr<TabletManager> _tablet_mgr;
 };
 
 TEST_F(TabletMgrTest, CreateTablet) {
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index 4f25d0b..7725810 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -93,8 +93,8 @@ public:
     }
 
     virtual void SetUp() {
-        config::max_runnings_transactions = 1000;
-
+        config::max_runnings_transactions_per_txn_map = 500;
+        _txn_mgr.reset(new TxnManager(64, 1024));
         std::vector<StorePath> paths;
         paths.emplace_back("_engine_data_path", -1);
         EngineOptions options;
@@ -162,7 +162,7 @@ public:
 private:
     OlapMeta* _meta;
     std::string _json_rowset_meta;
-    TxnManager _txn_mgr;
+    std::unique_ptr<TxnManager>  _txn_mgr;
     TPartitionId partition_id = 1123;
     TTransactionId transaction_id = 111;
     TTabletId tablet_id = 222;
@@ -176,7 +176,7 @@ private:
 };
 
 TEST_F(TxnManagerTest, PrepareNewTxn) {
-    OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 }
@@ -185,9 +185,9 @@ TEST_F(TxnManagerTest, PrepareNewTxn) {
 // 2. commit txn
 // 3. should be success
 TEST_F(TxnManagerTest, CommitTxnWithPrepare) {
-    OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
-    _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
     RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -199,7 +199,7 @@ TEST_F(TxnManagerTest, CommitTxnWithPrepare) {
 // 1. commit without prepare
 // 2. should success
 TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 }
@@ -207,10 +207,10 @@ TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) {
 // 1. commit twice with different rowset id
 // 2. should failed
 TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_diff_id, false);
     ASSERT_TRUE(status != OLAP_SUCCESS);
 }
@@ -218,30 +218,30 @@ TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) {
 // 1. commit twice with same rowset id
 // 2. should success
 TEST_F(TxnManagerTest, CommitTxnTwiceWithSameRowsetId) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_same_id, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 }
 
 // 1. prepare twice should be success
 TEST_F(TxnManagerTest, PrepareNewTxnTwice) {
-    OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 }
 
 // 1. txn could be rollbacked if it is not committed
 TEST_F(TxnManagerTest, RollbackNotCommittedTxn) {
-    OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.rollback_txn(partition_id, transaction_id,
+    status = _txn_mgr->rollback_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid);
     ASSERT_TRUE(status == OLAP_SUCCESS);
     RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -251,10 +251,10 @@ TEST_F(TxnManagerTest, RollbackNotCommittedTxn) {
 
 // 1. txn could not be rollbacked if it is committed
 TEST_F(TxnManagerTest, RollbackCommittedTxn) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.rollback_txn(partition_id, transaction_id,
+    status = _txn_mgr->rollback_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid);
     ASSERT_FALSE(status == OLAP_SUCCESS);
     RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -265,12 +265,12 @@ TEST_F(TxnManagerTest, RollbackCommittedTxn) {
 
 // 1. publish version success
 TEST_F(TxnManagerTest, PublishVersionSuccessful) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
     Version new_version(10,11);
     VersionHash new_versionhash = 123;
-    status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id,
+    status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 
@@ -286,28 +286,28 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
 TEST_F(TxnManagerTest, PublishNotExistedTxn) {
     Version new_version(10,11);
     VersionHash new_versionhash = 123;
-    OLAPStatus status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash);
     ASSERT_TRUE(status != OLAP_SUCCESS);
 }
 
 TEST_F(TxnManagerTest, DeletePreparedTxn) {
-    OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id,
+    status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid);
     ASSERT_TRUE(status == OLAP_SUCCESS);
 }
 
 TEST_F(TxnManagerTest, DeleteCommittedTxn) {
-    OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+    OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
     ASSERT_TRUE(status == OLAP_SUCCESS);
     RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
     status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _alpha_rowset->rowset_id(), rowset_meta);
     ASSERT_TRUE(status == OLAP_SUCCESS);
-    status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id,
+    status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id,
         tablet_id, schema_hash, _tablet_uid);
     ASSERT_TRUE(status == OLAP_SUCCESS);
     RowsetMetaSharedPtr rowset_meta2(new AlphaRowsetMeta());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org