You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/04/28 00:46:49 UTC
[doris] branch master updated: [profile](rf) refactor profile of runtime filters (#19134)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28016c53f0 [profile](rf) refactor profile of runtime filters (#19134)
28016c53f0 is described below
commit 28016c53f0c635e74ca7ae081a65b170587ed055
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Apr 28 08:46:42 2023 +0800
[profile](rf) refactor profile of runtime filters (#19134)
* [profile](rf) refactor profile of runtime filters
---------
Co-authored-by: github-actions[bot] <41...@users.noreply.github.com>
---
be/src/exprs/runtime_filter.cpp | 19 ++++++++-----------
be/src/exprs/runtime_filter.h | 9 +++++----
be/src/runtime/fragment_mgr.cpp | 3 ++-
be/src/vec/exec/scan/vscan_node.cpp | 3 +++
4 files changed, 18 insertions(+), 16 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2a9d7cc24f..e18b49b765 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1210,7 +1210,6 @@ bool IRuntimeFilter::await() {
return false;
}
} else {
- SCOPED_TIMER(_await_time_cost);
std::unique_lock lock(_inner_mutex);
if (_rf_state != RuntimeFilterState::READY) {
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
@@ -1485,23 +1484,21 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje
void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
if (_profile_init) {
+ parent_profile->add_child(_profile.get(), true, nullptr);
return;
}
{
- std::lock_guard guard(_inner_mutex);
+ std::lock_guard guard(_profile_mutex);
if (_profile_init) {
return;
}
DCHECK(parent_profile != nullptr);
- _profile.reset(
- new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
- ::doris::to_string(_runtime_filter_type))));
+ _name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
+ ::doris::to_string(_runtime_filter_type));
+ _profile.reset(new RuntimeProfile(_name));
_profile_init = true;
}
parent_profile->add_child(_profile.get(), true, nullptr);
- if (!_enable_pipeline_exec) {
- _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
- }
_profile->add_info_string("Info", _format_status());
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
@@ -1838,8 +1835,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
return Status::OK();
}
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) {
- int64_t start_update = MonotonicMillis();
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
+ int64_t start_apply) {
if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
set_ignored();
const PInFilter in_filter = param->request->in_filter();
@@ -1858,7 +1855,7 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) {
_profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms");
_profile->add_info_string("UpdateTime",
- std::to_string(MonotonicMillis() - start_update) + " ms");
+ std::to_string(MonotonicMillis() - start_apply) + " ms");
return Status::OK();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 52191b5182..b1dbbfa4b2 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -281,7 +281,7 @@ public:
void change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Status update_filter(const UpdateRuntimeFilterParams* param);
- Status update_filter(const UpdateRuntimeFilterParamsV2* param);
+ Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply);
void set_ignored() { _is_ignored = true; }
@@ -299,6 +299,8 @@ public:
void init_profile(RuntimeProfile* parent_profile);
+ std::string& get_name() { return _name; }
+
void update_runtime_filter_type_to_profile();
void ready_for_publish();
@@ -394,8 +396,6 @@ protected:
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
- // unix millis
- RuntimeProfile::Counter* _await_time_cost = nullptr;
/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
@@ -403,7 +403,8 @@ protected:
const bool _enable_pipeline_exec;
bool _profile_init = false;
-
+ doris::Mutex _profile_mutex;
+ std::string _name;
bool _opt_remote_rf;
};
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e2a8802fd9..6fe7079d7f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1160,6 +1160,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+ int64_t start_apply = MonotonicMillis();
const auto& fragment_instance_ids = request->fragment_instance_ids();
if (fragment_instance_ids.size() > 0) {
@@ -1205,7 +1206,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
int filter_id = request->filter_id();
IRuntimeFilter* real_filter = nullptr;
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter));
- RETURN_IF_ERROR(real_filter->update_filter(¶ms));
+ RETURN_IF_ERROR(real_filter->update_filter(¶ms, start_apply));
}
return Status::OK();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 5482df2e20..f926ab15c6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -119,9 +119,12 @@ Status VScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
// init profile for runtime filter
+ std::stringstream ss;
for (auto& rf_ctx : _runtime_filter_ctxs) {
rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
+ ss << rf_ctx.runtime_filter->get_name() << ", ";
}
+ _runtime_profile->add_info_string("RuntimeFilters: ", ss.str());
if (_is_pipeline_scan) {
if (_shared_scan_opt) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org