You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/29 14:21:10 UTC

[doris] 03/05: [fix](merge-on-write) fix dead lock when publish (#21339)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 31c4f2c961d1cf27f7450695c41b9df3bcf6b14c
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jun 29 20:58:47 2023 +0800

    [fix](merge-on-write) fix dead lock when publish (#21339)
---
 be/src/olap/task/engine_publish_version_task.cpp | 37 +++++-------------------
 be/src/olap/task/engine_publish_version_task.h   |  7 -----
 2 files changed, 7 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 02af6bf674..249a7a424c 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -72,8 +72,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
         const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
         std::vector<TTabletId>* succ_tablet_ids,
         std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
-        : _total_task_num(0),
-          _publish_version_req(publish_version_req),
+        : _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
           _succ_tablet_ids(succ_tablet_ids),
           _discontinuous_version_tablets(discontinuous_version_tablets) {}
@@ -88,25 +87,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
     _succ_tablet_ids->push_back(tablet_id);
 }
 
-void EnginePublishVersionTask::wait() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
-    _tablet_finish_cond.wait(lock);
-}
-
-void EnginePublishVersionTask::notify() {
-    std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
-    _tablet_finish_cond.notify_one();
-}
-
-int64_t EnginePublishVersionTask::finish_task() {
-    return _total_task_num.fetch_sub(1);
-}
-
 Status EnginePublishVersionTask::finish() {
     Status res = Status::OK();
     int64_t transaction_id = _publish_version_req.transaction_id;
     OlapStopWatch watch;
     VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
+    std::unique_ptr<ThreadPoolToken> token =
+            StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
+                    ThreadPool::ExecutionMode::CONCURRENT);
 
     // each partition
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -187,19 +175,13 @@ Status EnginePublishVersionTask::finish() {
                     continue;
                 }
             }
-            _total_task_num.fetch_add(1);
             auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
                     this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
-            auto submit_st =
-                    StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
-                            [=]() { tablet_publish_txn_ptr->handle(); });
+            auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
             CHECK(submit_st.ok()) << submit_st;
         }
     }
-    // wait for all publish txn finished
-    while (_total_task_num.load() != 0) {
-        wait();
-    }
+    token->wait();
 
     // check if the related tablet remained all have the version
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -260,12 +242,7 @@ void TabletPublishTxnTask::handle() {
         _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
         return;
     }
-    Defer defer {[&] {
-        _rowset->finish_publish();
-        if (_engine_publish_version_task->finish_task() == 1) {
-            _engine_publish_version_task->notify();
-        }
-    }};
+    Defer defer {[&] { _rowset->finish_publish(); }};
     auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
             _partition_id, _tablet, _transaction_id, _version, &_stats);
     if (publish_status != Status::OK()) {
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index c8a68dedea..8acf8099ca 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -93,21 +93,14 @@ public:
     void add_error_tablet_id(int64_t tablet_id);
     void add_succ_tablet_id(int64_t tablet_id);
 
-    void notify();
-    void wait();
-
     int64_t finish_task();
 
 private:
-    std::atomic<int64_t> _total_task_num;
     const TPublishVersionRequest& _publish_version_req;
     std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
     std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
-
-    std::mutex _tablet_finish_mutex;
-    std::condition_variable _tablet_finish_cond;
 };
 
 class AsyncTabletPublishTask {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org