You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/28 00:46:49 UTC

[doris] branch master updated: [profile](rf) refactor profile of runtime filters (#19134)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 28016c53f0 [profile](rf) refactor profile of runtime filters (#19134)
28016c53f0 is described below

commit 28016c53f0c635e74ca7ae081a65b170587ed055
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Apr 28 08:46:42 2023 +0800

    [profile](rf) refactor profile of runtime filters (#19134)
    
    * [profile](rf) refactor profile of runtime filters
    
    
    ---------
    
    Co-authored-by: github-actions[bot] <41...@users.noreply.github.com>
---
 be/src/exprs/runtime_filter.cpp     | 19 ++++++++-----------
 be/src/exprs/runtime_filter.h       |  9 +++++----
 be/src/runtime/fragment_mgr.cpp     |  3 ++-
 be/src/vec/exec/scan/vscan_node.cpp |  3 +++
 4 files changed, 18 insertions(+), 16 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2a9d7cc24f..e18b49b765 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1210,7 +1210,6 @@ bool IRuntimeFilter::await() {
             return false;
         }
     } else {
-        SCOPED_TIMER(_await_time_cost);
         std::unique_lock lock(_inner_mutex);
         if (_rf_state != RuntimeFilterState::READY) {
             int64_t ms_since_registration = MonotonicMillis() - registration_time_;
@@ -1485,23 +1484,21 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje
 
 void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
     if (_profile_init) {
+        parent_profile->add_child(_profile.get(), true, nullptr);
         return;
     }
     {
-        std::lock_guard guard(_inner_mutex);
+        std::lock_guard guard(_profile_mutex);
         if (_profile_init) {
             return;
         }
         DCHECK(parent_profile != nullptr);
-        _profile.reset(
-                new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
-                                               ::doris::to_string(_runtime_filter_type))));
+        _name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
+                            ::doris::to_string(_runtime_filter_type));
+        _profile.reset(new RuntimeProfile(_name));
         _profile_init = true;
     }
     parent_profile->add_child(_profile.get(), true, nullptr);
-    if (!_enable_pipeline_exec) {
-        _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
-    }
     _profile->add_info_string("Info", _format_status());
     if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
         update_runtime_filter_type_to_profile();
@@ -1838,8 +1835,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
     return Status::OK();
 }
 
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) {
-    int64_t start_update = MonotonicMillis();
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
+                                     int64_t start_apply) {
     if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
         set_ignored();
         const PInFilter in_filter = param->request->in_filter();
@@ -1858,7 +1855,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) {
 
     _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms");
     _profile->add_info_string("UpdateTime",
-                              std::to_string(MonotonicMillis() - start_update) + " ms");
+                              std::to_string(MonotonicMillis() - start_apply) + " ms");
     return Status::OK();
 }
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 52191b5182..b1dbbfa4b2 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -281,7 +281,7 @@ public:
     void change_to_bloom_filter();
     Status init_bloom_filter(const size_t build_bf_cardinality);
     Status update_filter(const UpdateRuntimeFilterParams* param);
-    Status update_filter(const UpdateRuntimeFilterParamsV2* param);
+    Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply);
 
     void set_ignored() { _is_ignored = true; }
 
@@ -299,6 +299,8 @@ public:
 
     void init_profile(RuntimeProfile* parent_profile);
 
+    std::string& get_name() { return _name; }
+
     void update_runtime_filter_type_to_profile();
 
     void ready_for_publish();
@@ -394,8 +396,6 @@ protected:
     // parent profile
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
-    // unix millis
-    RuntimeProfile::Counter* _await_time_cost = nullptr;
 
     /// Time in ms (from MonotonicMillis()), that the filter was registered.
     const int64_t registration_time_;
@@ -403,7 +403,8 @@ protected:
     const bool _enable_pipeline_exec;
 
     bool _profile_init = false;
-
+    doris::Mutex _profile_mutex;
+    std::string _name;
     bool _opt_remote_rf;
 };
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e2a8802fd9..6fe7079d7f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1160,6 +1160,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
 Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
                                    butil::IOBufAsZeroCopyInputStream* attach_data) {
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+    int64_t start_apply = MonotonicMillis();
 
     const auto& fragment_instance_ids = request->fragment_instance_ids();
     if (fragment_instance_ids.size() > 0) {
@@ -1205,7 +1206,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
         int filter_id = request->filter_id();
         IRuntimeFilter* real_filter = nullptr;
         RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter));
-        RETURN_IF_ERROR(real_filter->update_filter(&params));
+        RETURN_IF_ERROR(real_filter->update_filter(&params, start_apply));
     }
 
     return Status::OK();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 5482df2e20..f926ab15c6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -119,9 +119,12 @@ Status VScanNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
 
     // init profile for runtime filter
+    std::stringstream ss;
     for (auto& rf_ctx : _runtime_filter_ctxs) {
         rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
+        ss << rf_ctx.runtime_filter->get_name() << ", ";
     }
+    _runtime_profile->add_info_string("RuntimeFilters: ", ss.str());
 
     if (_is_pipeline_scan) {
         if (_shared_scan_opt) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org