You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/11/09 00:50:49 UTC

(doris) branch master updated: [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e591f7f25 [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)
66e591f7f25 is described below

commit 66e591f7f25464b51f43ab4c57f9889b6afb44eb
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Nov 9 08:50:42 2023 +0800

    [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)
---
 be/src/olap/delta_writer.cpp    | 56 +++++++++++++---------------
 be/src/util/ref_count_closure.h | 81 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 107 insertions(+), 30 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 23e1718cb7d..3e91e5efb06 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -243,21 +243,20 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
         }
     }
 
-    PTabletWriteSlaveRequest request;
-    RowsetMetaPB rowset_meta_pb = cur_rowset->rowset_meta()->get_rowset_pb();
-    request.set_allocated_rowset_meta(&rowset_meta_pb);
-    request.set_host(BackendOptions::get_localhost());
-    request.set_http_port(config::webserver_port);
+    auto request = std::make_shared<PTabletWriteSlaveRequest>();
+    *(request->mutable_rowset_meta()) = cur_rowset->rowset_meta()->get_rowset_pb();
+    request->set_host(BackendOptions::get_localhost());
+    request->set_http_port(config::webserver_port);
     string tablet_path = _rowset_builder.tablet()->tablet_path();
-    request.set_rowset_path(tablet_path);
-    request.set_token(ExecEnv::GetInstance()->token());
-    request.set_brpc_port(config::brpc_port);
-    request.set_node_id(node_info.id());
+    request->set_rowset_path(tablet_path);
+    request->set_token(ExecEnv::GetInstance()->token());
+    request->set_brpc_port(config::brpc_port);
+    request->set_node_id(node_info.id());
     for (int segment_id = 0; segment_id < cur_rowset->rowset_meta()->num_segments(); segment_id++) {
         std::stringstream segment_name;
         segment_name << cur_rowset->rowset_id() << "_" << segment_id << ".dat";
         int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str());
-        request.mutable_segments_size()->insert({segment_id, segment_size});
+        request->mutable_segments_size()->insert({segment_id, segment_size});
 
         if (!indices_ids.empty()) {
             for (auto index_id : indices_ids) {
@@ -269,41 +268,38 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
                 index_size.set_size(size);
                 // Fetch the map value for the current segment_id.
                 // If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue
-                auto& index_size_map_value = (*request.mutable_inverted_indices_size())[segment_id];
+                auto& index_size_map_value =
+                        (*(request->mutable_inverted_indices_size()))[segment_id];
                 // Add the new index size to the map value.
                 *index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
             }
         }
     }
-    RefCountClosure<PTabletWriteSlaveResult>* closure =
-            new RefCountClosure<PTabletWriteSlaveResult>();
-    closure->ref();
-    closure->ref();
-    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
-    closure->cntl.ignore_eovercrowded();
-    stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure);
-    static_cast<void>(request.release_rowset_meta());
-
-    closure->join();
-    if (closure->cntl.Failed()) {
+
+    auto pull_callback = DummyBrpcCallback<PTabletWriteSlaveResult>::create_shared();
+    auto closure = AutoReleaseClosure<
+            PTabletWriteSlaveRequest,
+            DummyBrpcCallback<PTabletWriteSlaveResult>>::create_unique(request, pull_callback);
+    closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
+    closure->cntl_->ignore_eovercrowded();
+    stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
+                                           closure->response_.get(), closure.release());
+
+    pull_callback->join();
+    if (pull_callback->cntl_->Failed()) {
         if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
                     stub, node_info.host(), node_info.async_internal_port())) {
             ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
-                    closure->cntl.remote_side());
+                    pull_callback->cntl_->remote_side());
         }
         LOG(WARNING) << "failed to send pull rowset request to slave replica, error="
-                     << berror(closure->cntl.ErrorCode())
-                     << ", error_text=" << closure->cntl.ErrorText()
+                     << berror(pull_callback->cntl_->ErrorCode())
+                     << ", error_text=" << pull_callback->cntl_->ErrorText()
                      << ". slave host: " << node_info.host() << ", tablet_id=" << _req.tablet_id
                      << ", txn_id=" << _req.txn_id;
         std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
         _unfinished_slave_node.erase(node_info.id());
     }
-
-    if (closure->unref()) {
-        delete closure;
-    }
-    closure = nullptr;
 }
 
 void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed) {
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index d2fbd2fd14e..d844a7fc820 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -54,4 +54,85 @@ private:
     std::atomic<int> _refs;
 };
 
+template <typename Response>
+class DummyBrpcCallback {
+    ENABLE_FACTORY_CREATOR(DummyBrpcCallback);
+
+public:
+    using ResponseType = Response;
+    DummyBrpcCallback() {
+        cntl_ = std::make_shared<brpc::Controller>();
+        response_ = std::make_shared<Response>();
+    }
+
+    void call() {}
+
+    void join() { brpc::Join(cntl_->call_id()); }
+
+    // controller has to be the same lifecycle with the closure, because brpc may use
+    // it in any stage of the rpc.
+    std::shared_ptr<brpc::Controller> cntl_;
+    // We do not know if brpc will use request or response after brpc method returns.
+    // So that we need keep a shared ptr here to ensure that brpc could use req/rep
+    // at any stage.
+    std::shared_ptr<Response> response_;
+};
+
+// The closure will be deleted after callback.
+// It could only be created by using shared ptr or unique ptr.
+// It will hold a weak ptr of T and call run of T
+// Callback() {
+//  xxxx;
+//  public
+//  void run() {
+//      logxxx
+//  }
+//  }
+//
+//  std::shared_ptr<Callback> b;
+//
+//  std::unique_ptr<AutoReleaseClosure> a(b);
+//  brpc_call(a.release());
+
+template <typename Request, typename Callback>
+class AutoReleaseClosure : public google::protobuf::Closure {
+    using Weak = typename std::shared_ptr<Callback>::weak_type;
+    using ResponseType = typename Callback::ResponseType;
+    ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
+
+public:
+    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
+            : callback_(callback) {
+        this->cntl_ = callback->cntl_;
+        this->response_ = callback->response_;
+    }
+
+    ~AutoReleaseClosure() override = default;
+
+    //  Will delete itself
+    void Run() override {
+        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+        Defer defer {[&]() { delete this; }};
+        // If lock failed, it means the callback object is deconstructed, then no need
+        // to deal with the callback any more.
+        if (auto tmp = callback_.lock()) {
+            tmp->call();
+        }
+    }
+
+    // controller has to be the same lifecycle with the closure, because brpc may use
+    // it in any stage of the rpc.
+    std::shared_ptr<brpc::Controller> cntl_;
+    // We do not know if brpc will use request or response after brpc method returns.
+    // So that we need keep a shared ptr here to ensure that brpc could use req/rep
+    // at any stage.
+    std::shared_ptr<Request> request_;
+    std::shared_ptr<ResponseType> response_;
+
+private:
+    // Use a weak ptr to keep the callback, so that the callback can be deleted if the main
+    // thread is freed.
+    Weak callback_;
+};
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org