You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/11/09 00:50:49 UTC
(doris) branch master updated: [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 66e591f7f25 [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)
66e591f7f25 is described below
commit 66e591f7f25464b51f43ab4c57f9889b6afb44eb
Author: yiguolei <67...@qq.com>
AuthorDate: Thu Nov 9 08:50:42 2023 +0800
[enhancement](brpc) add a auto release closure to ensure the closue safety (#26567)
---
be/src/olap/delta_writer.cpp | 56 +++++++++++++---------------
be/src/util/ref_count_closure.h | 81 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 107 insertions(+), 30 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 23e1718cb7d..3e91e5efb06 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -243,21 +243,20 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
}
}
- PTabletWriteSlaveRequest request;
- RowsetMetaPB rowset_meta_pb = cur_rowset->rowset_meta()->get_rowset_pb();
- request.set_allocated_rowset_meta(&rowset_meta_pb);
- request.set_host(BackendOptions::get_localhost());
- request.set_http_port(config::webserver_port);
+ auto request = std::make_shared<PTabletWriteSlaveRequest>();
+ *(request->mutable_rowset_meta()) = cur_rowset->rowset_meta()->get_rowset_pb();
+ request->set_host(BackendOptions::get_localhost());
+ request->set_http_port(config::webserver_port);
string tablet_path = _rowset_builder.tablet()->tablet_path();
- request.set_rowset_path(tablet_path);
- request.set_token(ExecEnv::GetInstance()->token());
- request.set_brpc_port(config::brpc_port);
- request.set_node_id(node_info.id());
+ request->set_rowset_path(tablet_path);
+ request->set_token(ExecEnv::GetInstance()->token());
+ request->set_brpc_port(config::brpc_port);
+ request->set_node_id(node_info.id());
for (int segment_id = 0; segment_id < cur_rowset->rowset_meta()->num_segments(); segment_id++) {
std::stringstream segment_name;
segment_name << cur_rowset->rowset_id() << "_" << segment_id << ".dat";
int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str());
- request.mutable_segments_size()->insert({segment_id, segment_size});
+ request->mutable_segments_size()->insert({segment_id, segment_size});
if (!indices_ids.empty()) {
for (auto index_id : indices_ids) {
@@ -269,41 +268,38 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
index_size.set_size(size);
// Fetch the map value for the current segment_id.
// If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue
- auto& index_size_map_value = (*request.mutable_inverted_indices_size())[segment_id];
+ auto& index_size_map_value =
+ (*(request->mutable_inverted_indices_size()))[segment_id];
// Add the new index size to the map value.
*index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
}
}
}
- RefCountClosure<PTabletWriteSlaveResult>* closure =
- new RefCountClosure<PTabletWriteSlaveResult>();
- closure->ref();
- closure->ref();
- closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
- closure->cntl.ignore_eovercrowded();
- stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure);
- static_cast<void>(request.release_rowset_meta());
-
- closure->join();
- if (closure->cntl.Failed()) {
+
+ auto pull_callback = DummyBrpcCallback<PTabletWriteSlaveResult>::create_shared();
+ auto closure = AutoReleaseClosure<
+ PTabletWriteSlaveRequest,
+ DummyBrpcCallback<PTabletWriteSlaveResult>>::create_unique(request, pull_callback);
+ closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
+ closure->cntl_->ignore_eovercrowded();
+ stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(),
+ closure->response_.get(), closure.release());
+
+ pull_callback->join();
+ if (pull_callback->cntl_->Failed()) {
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
stub, node_info.host(), node_info.async_internal_port())) {
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
- closure->cntl.remote_side());
+ pull_callback->cntl_->remote_side());
}
LOG(WARNING) << "failed to send pull rowset request to slave replica, error="
- << berror(closure->cntl.ErrorCode())
- << ", error_text=" << closure->cntl.ErrorText()
+ << berror(pull_callback->cntl_->ErrorCode())
+ << ", error_text=" << pull_callback->cntl_->ErrorText()
<< ". slave host: " << node_info.host() << ", tablet_id=" << _req.tablet_id
<< ", txn_id=" << _req.txn_id;
std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
_unfinished_slave_node.erase(node_info.id());
}
-
- if (closure->unref()) {
- delete closure;
- }
- closure = nullptr;
}
void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed) {
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index d2fbd2fd14e..d844a7fc820 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -54,4 +54,85 @@ private:
std::atomic<int> _refs;
};
+template <typename Response>
+class DummyBrpcCallback {
+ ENABLE_FACTORY_CREATOR(DummyBrpcCallback);
+
+public:
+ using ResponseType = Response;
+ DummyBrpcCallback() {
+ cntl_ = std::make_shared<brpc::Controller>();
+ response_ = std::make_shared<Response>();
+ }
+
+ void call() {}
+
+ void join() { brpc::Join(cntl_->call_id()); }
+
+ // controller has to be the same lifecycle with the closure, because brpc may use
+ // it in any stage of the rpc.
+ std::shared_ptr<brpc::Controller> cntl_;
+ // We do not know if brpc will use request or response after brpc method returns.
+ // So that we need keep a shared ptr here to ensure that brpc could use req/rep
+ // at any stage.
+ std::shared_ptr<Response> response_;
+};
+
+// The closure will be deleted after callback.
+// It could only be created by using shared ptr or unique ptr.
+// It will hold a weak ptr of T and call run of T
+// Callback() {
+// xxxx;
+// public
+// void run() {
+// logxxx
+// }
+// }
+//
+// std::shared_ptr<Callback> b;
+//
+// std::unique_ptr<AutoReleaseClosure> a(b);
+// brpc_call(a.release());
+
+template <typename Request, typename Callback>
+class AutoReleaseClosure : public google::protobuf::Closure {
+ using Weak = typename std::shared_ptr<Callback>::weak_type;
+ using ResponseType = typename Callback::ResponseType;
+ ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
+
+public:
+ AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
+ : callback_(callback) {
+ this->cntl_ = callback->cntl_;
+ this->response_ = callback->response_;
+ }
+
+ ~AutoReleaseClosure() override = default;
+
+ // Will delete itself
+ void Run() override {
+ SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+ Defer defer {[&]() { delete this; }};
+ // If lock failed, it means the callback object is deconstructed, then no need
+ // to deal with the callback any more.
+ if (auto tmp = callback_.lock()) {
+ tmp->call();
+ }
+ }
+
+ // controller has to be the same lifecycle with the closure, because brpc may use
+ // it in any stage of the rpc.
+ std::shared_ptr<brpc::Controller> cntl_;
+ // We do not know if brpc will use request or response after brpc method returns.
+ // So that we need keep a shared ptr here to ensure that brpc could use req/rep
+ // at any stage.
+ std::shared_ptr<Request> request_;
+ std::shared_ptr<ResponseType> response_;
+
+private:
+ // Use a weak ptr to keep the callback, so that the callback can be deleted if the main
+ // thread is freed.
+ Weak callback_;
+};
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org