You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/13 01:10:09 UTC
[doris] branch master updated: [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d430aec3ae [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
d430aec3ae is described below
commit d430aec3aeded447823b155756282b258852777b
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 13 09:10:02 2022 +0800
[Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
---
be/src/runtime/fragment_mgr.cpp | 58 +++++++++++++++++++++++------------------
1 file changed, 32 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0471b63d71..b18e80df3e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -670,29 +670,32 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
_runtimefilter_controller.add_entity(params, &handler);
exec_state->set_merge_controller_handler(handler);
- auto& runtime_filter_params = params.params.runtime_filter_params;
- if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
- _bf_size_map.insert({fragments_ctx->query_id, {}});
- }
- for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
- int filter_id = filterid_to_desc.first;
- const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
- if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
- continue;
- }
- const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
- if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
- continue;
- }
- if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
- _bf_size_map[fragments_ctx->query_id].insert(
- {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
- }
- }
-
RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
+ auto& runtime_filter_params = params.params.runtime_filter_params;
+ if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
+ auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id);
+ if (bf_size_for_cur_query == _bf_size_map.end()) {
+ _bf_size_map.insert({fragments_ctx->query_id, {}});
+ }
+ for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+ int filter_id = filterid_to_desc.first;
+ const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+ if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+ continue;
+ }
+ const auto& build_iter =
+ runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+ continue;
+ }
+ if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
+ _bf_size_map[fragments_ctx->query_id].insert(
+ {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
+ }
+ }
+ }
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
_cv.notify_all();
}
@@ -979,12 +982,15 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
- auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
- if (bf_size_for_cur_query != _bf_size_map.end()) {
- for (auto& iter : bf_size_for_cur_query->second) {
- auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
- DCHECK(bf != nullptr);
- bf->init_with_fixed_length(iter.second);
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
+ if (bf_size_for_cur_query != _bf_size_map.end()) {
+ for (auto& iter : bf_size_for_cur_query->second) {
+ auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
+ DCHECK(bf != nullptr);
+ bf->init_with_fixed_length(iter.second);
+ }
}
}
RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org