You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/29 14:21:10 UTC
[doris] 03/05: [fix](merge-on-write) fix dead lock when publish (#21339)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 31c4f2c961d1cf27f7450695c41b9df3bcf6b14c
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jun 29 20:58:47 2023 +0800
[fix](merge-on-write) fix dead lock when publish (#21339)
---
be/src/olap/task/engine_publish_version_task.cpp | 37 +++++-------------------
be/src/olap/task/engine_publish_version_task.h | 7 -----
2 files changed, 7 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 02af6bf674..249a7a424c 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -72,8 +72,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
- : _total_task_num(0),
- _publish_version_req(publish_version_req),
+ : _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
@@ -88,25 +87,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
_succ_tablet_ids->push_back(tablet_id);
}
-void EnginePublishVersionTask::wait() {
- std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
- _tablet_finish_cond.wait(lock);
-}
-
-void EnginePublishVersionTask::notify() {
- std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
- _tablet_finish_cond.notify_one();
-}
-
-int64_t EnginePublishVersionTask::finish_task() {
- return _total_task_num.fetch_sub(1);
-}
-
Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
+ std::unique_ptr<ThreadPoolToken> token =
+ StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
+ ThreadPool::ExecutionMode::CONCURRENT);
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -187,19 +175,13 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
- _total_task_num.fetch_add(1);
auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
- auto submit_st =
- StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
- [=]() { tablet_publish_txn_ptr->handle(); });
+ auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok()) << submit_st;
}
}
- // wait for all publish txn finished
- while (_total_task_num.load() != 0) {
- wait();
- }
+ token->wait();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -260,12 +242,7 @@ void TabletPublishTxnTask::handle() {
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
- Defer defer {[&] {
- _rowset->finish_publish();
- if (_engine_publish_version_task->finish_task() == 1) {
- _engine_publish_version_task->notify();
- }
- }};
+ Defer defer {[&] { _rowset->finish_publish(); }};
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (publish_status != Status::OK()) {
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index c8a68dedea..8acf8099ca 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -93,21 +93,14 @@ public:
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);
- void notify();
- void wait();
-
int64_t finish_task();
private:
- std::atomic<int64_t> _total_task_num;
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
-
- std::mutex _tablet_finish_mutex;
- std::condition_variable _tablet_finish_cond;
};
class AsyncTabletPublishTask {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org