You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/04/09 14:35:25 UTC
[incubator-doris] branch master updated: [Performance] Support
sharding txn_map_lock into more small map locks to make good performance
for txn manage task (#3222)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5703ef [Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222)
a5703ef is described below
commit a5703ef114ee5be2657481f6393e2bca054fd48f
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Apr 9 09:35:15 2020 -0500
[Performance] Support sharding txn_map_lock into more small map locks to make good performance for txn manage task (#3222)
This PR is to enhance the performance for txn manage task, when there are so many txn in
BE, the only one txn_map_lock and additional _txn_locks may cause poor performance, and
now we remove the additional _txn_locks and split the txn_map_lock into many small locks.
---
be/src/common/config.h | 12 +-
be/src/olap/storage_engine.cpp | 2 +-
be/src/olap/tablet_manager.cpp | 1 +
be/src/olap/tablet_manager.h | 2 +-
be/src/olap/txn_manager.cpp | 236 ++++++++++++++------------
be/src/olap/txn_manager.h | 75 ++++++--
be/test/olap/olap_snapshot_converter_test.cpp | 1 -
be/test/olap/tablet_mgr_test.cpp | 6 +-
be/test/olap/txn_manager_test.cpp | 48 +++---
9 files changed, 231 insertions(+), 152 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 93a0bff..9b06918 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -491,9 +491,9 @@ namespace config {
// brpc config, 200M
CONF_Int64(brpc_max_body_size, "209715200")
- // max number of txns in txn manager
+ // max number of txns for every txn_partition_map in txn manager
// this is a self protection to avoid too many txns saving in manager
- CONF_mInt64(max_runnings_transactions, "2000");
+ CONF_mInt64(max_runnings_transactions_per_txn_map, "100");
// tablet_map_lock shard size, the value is 2^n, n=0,1,2,3,4
// this is a an enhancement for better performance to manage tablet
@@ -501,6 +501,14 @@ namespace config {
CONF_String(plugin_path, "${DORIS_HOME}/plugin")
+ // txn_map_lock shard size, the value is 2^n, n=0,1,2,3,4
+ // this is a an enhancement for better performance to manage txn
+ CONF_Int32(txn_map_shard_size, "128");
+
+ // txn_lock shard size, the value is 2^n, n=0,1,2,3,4
+ // this is a an enhancement for better performance to publish txn
+ CONF_Int32(txn_shard_size, "1024")
+
} // namespace config
} // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 16b9025..a8c8967 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -114,7 +114,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_is_all_cluster_id_exist(true),
_index_stream_lru_cache(NULL),
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
- _txn_manager(new TxnManager()),
+ _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_block_manager(nullptr),
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 793fd45..c31ec5f 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -70,6 +70,7 @@ TabletManager::TabletManager(int32_t tablet_map_lock_shard_size)
: _tablet_map_lock_shard_size(tablet_map_lock_shard_size),
_last_update_stat_ms(0) {
DCHECK_GT(_tablet_map_lock_shard_size, 0);
+ DCHECK_EQ(_tablet_map_lock_shard_size & (tablet_map_lock_shard_size - 1), 0);
_tablet_map_lock_array = new RWMutex[_tablet_map_lock_shard_size];
_tablet_map_array = new tablet_map_t[_tablet_map_lock_shard_size];
}
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 263b14b..b47be43 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -193,7 +193,7 @@ private:
// tablet_id -> TabletInstances
typedef std::unordered_map<int64_t, TableInstances> tablet_map_t;
- int32_t _tablet_map_lock_shard_size;
+ const int32_t _tablet_map_lock_shard_size;
// _tablet_map_lock_array[i] protect _tablet_map_array[i], i=0,1,2...,and i < _tablet_map_lock_shard_size
RWMutex *_tablet_map_lock_array;
tablet_map_t *_tablet_map_array;
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index c098c5b..8fdec4d 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "olap/storage_engine.h"
+#include "txn_manager.h"
#include <signal.h>
@@ -36,6 +36,7 @@
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/lru_cache.h"
+#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/push_handler.h"
@@ -70,10 +71,17 @@ using std::vector;
namespace doris {
-TxnManager::TxnManager() {
- for (int i = 0; i < _txn_lock_num; ++i) {
- _txn_locks[i] = std::make_shared<RWMutex>();
- }
+TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size)
+ : _txn_map_shard_size(txn_map_shard_size),
+ _txn_shard_size(txn_shard_size) {
+ DCHECK_GT(_txn_map_shard_size, 0);
+ DCHECK_GT(_txn_shard_size, 0);
+ DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
+ DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
+ _txn_map_locks = new RWMutex[_txn_map_shard_size];
+ _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
+ _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
+ _txn_mutex = new Mutex[_txn_shard_size];
}
OLAPStatus TxnManager::prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id,
@@ -111,12 +119,12 @@ OLAPStatus TxnManager::prepare_txn(
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid,
const PUniqueId& load_id) {
- pair<int64_t, int64_t> key(partition_id, transaction_id);
+ TxnKey key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock wrlock(_get_txn_lock(transaction_id));
- WriteLock txn_wrlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it != _txn_tablet_map.end()) {
+ WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it != txn_tablet_map.end()) {
auto load_itr = it->second.find(tablet_info);
if (load_itr != it->second.end()) {
// found load for txn,tablet
@@ -137,8 +145,9 @@ OLAPStatus TxnManager::prepare_txn(
// check if there are too many transactions on running.
// if yes, reject the request.
- if (_txn_partition_map.size() > config::max_runnings_transactions) {
- LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions;
+ txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+ if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
+ LOG(WARNING) << "too many transactions: " << txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions_per_txn_map;
return OLAP_ERR_TOO_MANY_TRANSACTIONS;
}
@@ -146,7 +155,7 @@ OLAPStatus TxnManager::prepare_txn(
// case 1: user start a new txn, rowset_ptr = null
// case 2: loading txn from meta env
TabletTxnInfo load_info(load_id, nullptr);
- _txn_tablet_map[key][tablet_info] = load_info;
+ txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
VLOG(3) << "add transaction to engine successfully."
@@ -175,12 +184,13 @@ OLAPStatus TxnManager::commit_txn(
<< ", tablet: " << tablet_info.to_string();
return OLAP_ERR_ROWSET_INVALID;
}
- WriteLock wrlock(_get_txn_lock(transaction_id));
+
{
// get tx
- ReadLock rdlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it != _txn_tablet_map.end()) {
+ ReadLock rdlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it != txn_tablet_map.end()) {
auto load_itr = it->second.find(tablet_info);
if (load_itr != it->second.end()) {
// found load for txn,tablet
@@ -232,9 +242,10 @@ OLAPStatus TxnManager::commit_txn(
}
{
- WriteLock wrlock(&_txn_map_lock);
+ WriteLock wrlock(&_get_txn_map_lock(transaction_id));
TabletTxnInfo load_info(load_id, rowset_ptr);
- _txn_tablet_map[key][tablet_info] = load_info;
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
LOG(INFO) << "commit transaction to engine successfully."
<< " partition_id: " << key.first
@@ -253,11 +264,12 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
RowsetSharedPtr rowset_ptr = nullptr;
- WriteLock wrlock(_get_txn_lock(transaction_id));
+ MutexLock txn_lock(&_get_txn_lock(transaction_id));
{
- ReadLock rlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it != _txn_tablet_map.end()) {
+ ReadLock rlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it != txn_tablet_map.end()) {
auto load_itr = it->second.find(tablet_info);
if (load_itr != it->second.end()) {
// found load for txn,tablet
@@ -287,9 +299,10 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
return OLAP_ERR_TRANSACTION_NOT_EXIST;
}
{
- WriteLock wrlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it != _txn_tablet_map.end()) {
+ WriteLock wrlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it != txn_tablet_map.end()) {
it->second.erase(tablet_info);
LOG(INFO) << "publish txn successfully."
<< " partition_id: " << key.first
@@ -297,7 +310,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
<< ", tablet: " << tablet_info.to_string()
<< ", rowsetid: " << rowset_ptr->rowset_id();
if (it->second.empty()) {
- _txn_tablet_map.erase(it);
+ txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
}
}
@@ -313,10 +326,10 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock wrlock(_get_txn_lock(transaction_id));
- WriteLock txn_wrlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it != _txn_tablet_map.end()) {
+ WriteLock wrlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it != txn_tablet_map.end()) {
auto load_itr = it->second.find(tablet_info);
if (load_itr != it->second.end()) {
// found load for txn,tablet
@@ -334,7 +347,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
<< ", transaction_id: " << key.second
<< ", tablet: " << tablet_info.to_string();
if (it->second.empty()) {
- _txn_tablet_map.erase(it);
+ txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
}
}
@@ -347,10 +360,10 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock wrlock(_get_txn_lock(transaction_id));
- WriteLock txn_wrlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it == _txn_tablet_map.end()) {
+ WriteLock txn_wrlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it == txn_tablet_map.end()) {
return OLAP_ERR_TRANSACTION_NOT_EXIST;
}
auto load_itr = it->second.find(tablet_info);
@@ -384,7 +397,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
}
it->second.erase(tablet_info);
if (it->second.empty()) {
- _txn_tablet_map.erase(it);
+ txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
}
return OLAP_SUCCESS;
@@ -398,15 +411,18 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
}
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- ReadLock txn_rdlock(&_txn_map_lock);
- for (auto& it : _txn_tablet_map) {
- if (it.second.find(tablet_info) != it.second.end()) {
- *partition_id = it.first.first;
- transaction_ids->insert(it.first.second);
- VLOG(3) << "find transaction on tablet."
- << "partition_id: " << it.first.first
- << ", transaction_id: " << it.first.second
- << ", tablet: " << tablet_info.to_string();
+ for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+ ReadLock txn_rdlock(&_txn_map_locks[i]);
+ txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
+ for (auto& it : txn_tablet_map) {
+ if (it.second.find(tablet_info) != it.second.end()) {
+ *partition_id = it.first.first;
+ transaction_ids->insert(it.first.second);
+ VLOG(3) << "find transaction on tablet."
+ << "partition_id: " << it.first.first
+ << ", transaction_id: " << it.first.second
+ << ", tablet: " << tablet_info.to_string();
+ }
}
}
}
@@ -415,33 +431,36 @@ void TxnManager::get_tablet_related_txns(TTabletId tablet_id, SchemaHash schema_
// maybe lock error, because not get txn lock before remove from meta
void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- WriteLock txn_wrlock(&_txn_map_lock);
- for (auto it = _txn_tablet_map.begin(); it != _txn_tablet_map.end();) {
- auto load_itr = it->second.find(tablet_info);
- if (load_itr != it->second.end()) {
- TabletTxnInfo& load_info = load_itr->second;
- if (load_info.rowset != nullptr && meta != nullptr) {
- LOG(INFO) << " delete transaction from engine "
- << ", tablet: " << tablet_info.to_string()
- << ", rowset id: " << load_info.rowset->rowset_id();
- RowsetMetaManager::remove(meta, tablet_uid,
- load_info.rowset->rowset_id());
- }
- LOG(INFO) << "remove tablet related txn."
- << " partition_id: " << it->first.first
- << ", transaction_id: " << it->first.second
- << ", tablet: " << tablet_info.to_string() << ", rowset: "
- << (load_info.rowset != nullptr
+ for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+ WriteLock txn_wrlock(&_txn_map_locks[i]);
+ txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
+ for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
+ auto load_itr = it->second.find(tablet_info);
+ if (load_itr != it->second.end()) {
+ TabletTxnInfo& load_info = load_itr->second;
+ if (load_info.rowset != nullptr && meta != nullptr) {
+ LOG(INFO) << " delete transaction from engine "
+ << ", tablet: " << tablet_info.to_string()
+ << ", rowset id: " << load_info.rowset->rowset_id();
+ RowsetMetaManager::remove(meta, tablet_uid,
+ load_info.rowset->rowset_id());
+ }
+ LOG(INFO) << "remove tablet related txn."
+ << " partition_id: " << it->first.first
+ << ", transaction_id: " << it->first.second
+ << ", tablet: " << tablet_info.to_string() << ", rowset: "
+ << (load_info.rowset != nullptr
? load_info.rowset->rowset_id().to_string()
: "0");
- it->second.erase(tablet_info);
- }
- if (it->second.empty()) {
- _clear_txn_partition_map_unlocked(it->first.second,
- it->first.first);
- it = _txn_tablet_map.erase(it);
- } else {
- ++it;
+ it->second.erase(tablet_info);
+ }
+ if (it->second.empty()) {
+ _clear_txn_partition_map_unlocked(it->first.second,
+ it->first.first);
+ it = txn_tablet_map.erase(it);
+ } else {
+ ++it;
+ }
}
}
}
@@ -451,10 +470,10 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
// get tablets in this transaction
pair<int64_t, int64_t> key(partition_id, transaction_id);
- ReadLock rdlock(_get_txn_lock(transaction_id));
- ReadLock txn_rdlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- if (it == _txn_tablet_map.end()) {
+ ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it == txn_tablet_map.end()) {
VLOG(3) << "could not find tablet for"
<< " partition_id=" << partition_id
<< ", transaction_id=" << transaction_id;
@@ -472,10 +491,12 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
}
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
- ReadLock txn_rdlock(&_txn_map_lock);
- for (auto& it : _txn_tablet_map) {
- for (auto& tablet_load_it : it.second) {
- tablet_infos->emplace(tablet_load_it.first);
+ for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+ ReadLock txn_rdlock(&_txn_map_locks[i]);
+ for (auto& it : _txn_tablet_maps[i]) {
+ for (auto& tablet_load_it : it.second) {
+ tablet_infos->emplace(tablet_load_it.first);
+ }
}
}
}
@@ -484,10 +505,10 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- ReadLock rdlock(_get_txn_lock(transaction_id));
- ReadLock txn_rdlock(&_txn_map_lock);
- auto it = _txn_tablet_map.find(key);
- bool found = it != _txn_tablet_map.end()
+ ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+ txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ bool found = it != txn_tablet_map.end()
&& it->second.find(tablet_info) != it->second.end();
return found;
@@ -496,18 +517,20 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
int64_t now = UnixSeconds();
// traverse the txn map, and get all expired txns
- ReadLock txn_rdlock(&_txn_map_lock);
- for (auto& it : _txn_tablet_map) {
- auto txn_id = it.first.second;
- for (auto& t_map : it.second) {
- double diff = difftime(now, t_map.second.creation_time);
- if (diff >= config::pending_data_expire_time_sec) {
- (*expire_txn_map)[t_map.first].push_back(txn_id);
- if (VLOG_IS_ON(3)) {
- VLOG(3) << "find expired txn."
- << " tablet=" << t_map.first.to_string()
- << " transaction_id=" << txn_id
- << " exist_sec=" << diff;
+ for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+ ReadLock txn_rdlock(&_txn_map_locks[i]);
+ for (auto& it : _txn_tablet_maps[i]) {
+ auto txn_id = it.first.second;
+ for (auto& t_map : it.second) {
+ double diff = difftime(now, t_map.second.creation_time);
+ if (diff >= config::pending_data_expire_time_sec) {
+ (*expire_txn_map)[t_map.first].push_back(txn_id);
+ if (VLOG_IS_ON(3)) {
+ VLOG(3) << "find expired txn."
+ << " tablet=" << t_map.first.to_string()
+ << " transaction_id=" << txn_id
+ << " exist_sec=" << diff;
+ }
}
}
}
@@ -515,9 +538,10 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
}
void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) {
- ReadLock txn_rdlock(&_txn_map_lock);
- auto it = _txn_partition_map.find(transaction_id);
- if (it != _txn_partition_map.end()) {
+ ReadLock txn_rdlock(&_get_txn_map_lock(transaction_id));
+ txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+ auto it = txn_partition_map.find(transaction_id);
+ if (it != txn_partition_map.end()) {
for (int64_t partition_id : it->second) {
partition_ids->push_back(partition_id);
}
@@ -525,19 +549,21 @@ void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vec
}
void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
- auto find = _txn_partition_map.find(transaction_id);
- if (find == _txn_partition_map.end()) {
- _txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
+ txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+ auto find = txn_partition_map.find(transaction_id);
+ if (find == txn_partition_map.end()) {
+ txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
}
- _txn_partition_map[transaction_id].insert(partition_id);
+ txn_partition_map[transaction_id].insert(partition_id);
}
void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
- auto it = _txn_partition_map.find(transaction_id);
- if (it != _txn_partition_map.end()) {
+ txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
+ auto it = txn_partition_map.find(transaction_id);
+ if (it != txn_partition_map.end()) {
it->second.erase(partition_id);
if (it->second.empty()) {
- _txn_partition_map.erase(it);
+ txn_partition_map.erase(it);
}
}
}
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 96736d0..75ad152 100755
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -64,15 +64,17 @@ struct TabletTxnInfo {
TabletTxnInfo() {}
};
+
// txn manager is used to manage mapping between tablet and txns
class TxnManager {
public:
- TxnManager();
+ TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size);
~TxnManager() {
- _txn_tablet_map.clear();
- _txn_partition_map.clear();
- _txn_locks.clear();
+ delete [] _txn_tablet_maps;
+ delete [] _txn_partition_maps;
+ delete [] _txn_map_locks;
+ delete [] _txn_mutex;
}
OLAPStatus prepare_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id,
@@ -140,9 +142,35 @@ public:
void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids);
private:
- RWMutex* _get_txn_lock(TTransactionId txn_id) {
- return _txn_locks[txn_id % _txn_lock_num].get();
- }
+
+ using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
+
+ // implement TxnKey hash function to support TxnKey as a key for unordered_map
+ struct TxnKeyHash {
+ template<typename T, typename U>
+ size_t operator()(const std::pair<T, U> &e) const {
+ return std::hash<T>()(e.first) ^ std::hash<U>()(e.second);
+ }
+ };
+
+ // implement TxnKey equal function to support TxnKey as a key for unordered_map
+ struct TxnKeyEqual {
+ template <class T, typename U>
+ bool operator()(const std::pair<T, U> &l, const std::pair<T, U> &r) const{
+ return l.first == r.first && l.second == r.second;
+ }
+ };
+
+ typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, TxnKeyHash, TxnKeyEqual> txn_tablet_map_t;
+ typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> txn_partition_map_t;
+
+ inline RWMutex& _get_txn_map_lock(TTransactionId transactionId);
+
+ inline txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId);
+
+ inline txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
+
+ inline Mutex& _get_txn_lock(TTransactionId transactionId);
// insert or remove (transaction_id, partition_id) from _txn_partition_map
// get _txn_map_lock before calling
@@ -150,20 +178,39 @@ private:
void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
private:
- RWMutex _txn_map_lock;
- using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
- std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map;
+ const int32_t _txn_map_shard_size;
+
+ const int32_t _txn_shard_size;
+
+ // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size
+ txn_tablet_map_t *_txn_tablet_maps;
// transaction_id -> corresponding partition ids
// This is mainly for the clear txn task received from FE, which may only has transaction id,
// so we need this map to find out which partitions are corresponding to a transaction id.
- // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map'
- std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map;
+ // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]'
+ txn_partition_map_t *_txn_partition_maps;
- const int32_t _txn_lock_num = 100;
- std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks;
+ RWMutex *_txn_map_locks;
+ Mutex *_txn_mutex;
DISALLOW_COPY_AND_ASSIGN(TxnManager);
}; // TxnManager
+inline RWMutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) {
+ return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) {
+ return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(TTransactionId transactionId) {
+ return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
+}
+
+inline Mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
+ return _txn_mutex[transactionId & (_txn_shard_size - 1)];
+}
+
}
#endif // DORIS_BE_SRC_OLAP_TXN_MANAGER_H
diff --git a/be/test/olap/olap_snapshot_converter_test.cpp b/be/test/olap/olap_snapshot_converter_test.cpp
index 3bcabf6..f554499 100644
--- a/be/test/olap/olap_snapshot_converter_test.cpp
+++ b/be/test/olap/olap_snapshot_converter_test.cpp
@@ -107,7 +107,6 @@ private:
DataDir* _data_dir;
OlapMeta* _meta;
std::string _json_rowset_meta;
- TxnManager _txn_mgr;
std::string _engine_data_path;
std::string _meta_path;
int64_t _tablet_id;
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 44e8593..d78fe60 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -78,7 +78,7 @@ public:
+ "/" + std::to_string(0)
+ "/" + std::to_string(_tablet_id)
+ "/" + std::to_string(_schema_hash);
- _tablet_mgr = new TabletManager(1);
+ _tablet_mgr.reset(new TabletManager(1));
}
virtual void TearDown() {
@@ -86,18 +86,16 @@ public:
if (boost::filesystem::exists(_engine_data_path)) {
ASSERT_TRUE(boost::filesystem::remove_all(_engine_data_path));
}
- delete _tablet_mgr;
}
private:
DataDir* _data_dir;
std::string _json_rowset_meta;
- TxnManager _txn_mgr;
std::string _engine_data_path;
int64_t _tablet_id;
int32_t _schema_hash;
string _tablet_data_path;
- TabletManager* _tablet_mgr;
+ std::unique_ptr<TabletManager> _tablet_mgr;
};
TEST_F(TabletMgrTest, CreateTablet) {
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index 4f25d0b..7725810 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -93,8 +93,8 @@ public:
}
virtual void SetUp() {
- config::max_runnings_transactions = 1000;
-
+ config::max_runnings_transactions_per_txn_map = 500;
+ _txn_mgr.reset(new TxnManager(64, 1024));
std::vector<StorePath> paths;
paths.emplace_back("_engine_data_path", -1);
EngineOptions options;
@@ -162,7 +162,7 @@ public:
private:
OlapMeta* _meta;
std::string _json_rowset_meta;
- TxnManager _txn_mgr;
+ std::unique_ptr<TxnManager> _txn_mgr;
TPartitionId partition_id = 1123;
TTransactionId transaction_id = 111;
TTabletId tablet_id = 222;
@@ -176,7 +176,7 @@ private:
};
TEST_F(TxnManagerTest, PrepareNewTxn) {
- OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
ASSERT_TRUE(status == OLAP_SUCCESS);
}
@@ -185,9 +185,9 @@ TEST_F(TxnManagerTest, PrepareNewTxn) {
// 2. commit txn
// 3. should be success
TEST_F(TxnManagerTest, CommitTxnWithPrepare) {
- OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
- _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -199,7 +199,7 @@ TEST_F(TxnManagerTest, CommitTxnWithPrepare) {
// 1. commit without prepare
// 2. should success
TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
}
@@ -207,10 +207,10 @@ TEST_F(TxnManagerTest, CommitTxnWithNoPrepare) {
// 1. commit twice with different rowset id
// 2. should failed
TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_diff_id, false);
ASSERT_TRUE(status != OLAP_SUCCESS);
}
@@ -218,30 +218,30 @@ TEST_F(TxnManagerTest, CommitTxnTwiceWithDiffRowsetId) {
// 1. commit twice with same rowset id
// 2. should success
TEST_F(TxnManagerTest, CommitTxnTwiceWithSameRowsetId) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset_same_id, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
}
// 1. prepare twice should be success
TEST_F(TxnManagerTest, PrepareNewTxnTwice) {
- OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
ASSERT_TRUE(status == OLAP_SUCCESS);
}
// 1. txn could be rollbacked if it is not committed
TEST_F(TxnManagerTest, RollbackNotCommittedTxn) {
- OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.rollback_txn(partition_id, transaction_id,
+ status = _txn_mgr->rollback_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid);
ASSERT_TRUE(status == OLAP_SUCCESS);
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -251,10 +251,10 @@ TEST_F(TxnManagerTest, RollbackNotCommittedTxn) {
// 1. txn could not be rollbacked if it is committed
TEST_F(TxnManagerTest, RollbackCommittedTxn) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.rollback_txn(partition_id, transaction_id,
+ status = _txn_mgr->rollback_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid);
ASSERT_FALSE(status == OLAP_SUCCESS);
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
@@ -265,12 +265,12 @@ TEST_F(TxnManagerTest, RollbackCommittedTxn) {
// 1. publish version success
TEST_F(TxnManagerTest, PublishVersionSuccessful) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
Version new_version(10,11);
VersionHash new_versionhash = 123;
- status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id,
+ status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash);
ASSERT_TRUE(status == OLAP_SUCCESS);
@@ -286,28 +286,28 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
TEST_F(TxnManagerTest, PublishNotExistedTxn) {
Version new_version(10,11);
VersionHash new_versionhash = 123;
- OLAPStatus status = _txn_mgr.publish_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, new_version, new_versionhash);
ASSERT_TRUE(status != OLAP_SUCCESS);
}
TEST_F(TxnManagerTest, DeletePreparedTxn) {
- OLAPStatus status = _txn_mgr.prepare_txn(partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->prepare_txn(partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id,
+ status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid);
ASSERT_TRUE(status == OLAP_SUCCESS);
}
TEST_F(TxnManagerTest, DeleteCommittedTxn) {
- OLAPStatus status = _txn_mgr.commit_txn(_meta, partition_id, transaction_id,
+ OLAPStatus status = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid, load_id, _alpha_rowset, false);
ASSERT_TRUE(status == OLAP_SUCCESS);
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
status = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _alpha_rowset->rowset_id(), rowset_meta);
ASSERT_TRUE(status == OLAP_SUCCESS);
- status = _txn_mgr.delete_txn(_meta, partition_id, transaction_id,
+ status = _txn_mgr->delete_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash, _tablet_uid);
ASSERT_TRUE(status == OLAP_SUCCESS);
RowsetMetaSharedPtr rowset_meta2(new AlphaRowsetMeta());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org