You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/13 01:10:09 UTC

[doris] branch master updated: [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d430aec3ae [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
d430aec3ae is described below

commit d430aec3aeded447823b155756282b258852777b
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 13 09:10:02 2022 +0800

    [Bug](bloomfilter) fix concurrency bug caused by bloom filter (#13306)
---
 be/src/runtime/fragment_mgr.cpp | 58 +++++++++++++++++++++++------------------
 1 file changed, 32 insertions(+), 26 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0471b63d71..b18e80df3e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -670,29 +670,32 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
     _runtimefilter_controller.add_entity(params, &handler);
     exec_state->set_merge_controller_handler(handler);
 
-    auto& runtime_filter_params = params.params.runtime_filter_params;
-    if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
-        _bf_size_map.insert({fragments_ctx->query_id, {}});
-    }
-    for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
-        int filter_id = filterid_to_desc.first;
-        const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
-        if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
-            continue;
-        }
-        const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
-        if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
-            continue;
-        }
-        if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
-            _bf_size_map[fragments_ctx->query_id].insert(
-                    {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
-        }
-    }
-
     RETURN_IF_ERROR(exec_state->prepare(params));
     {
         std::lock_guard<std::mutex> lock(_lock);
+        auto& runtime_filter_params = params.params.runtime_filter_params;
+        if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
+            auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id);
+            if (bf_size_for_cur_query == _bf_size_map.end()) {
+                _bf_size_map.insert({fragments_ctx->query_id, {}});
+            }
+            for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+                int filter_id = filterid_to_desc.first;
+                const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+                if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+                    continue;
+                }
+                const auto& build_iter =
+                        runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+                if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+                    continue;
+                }
+                if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
+                    _bf_size_map[fragments_ctx->query_id].insert(
+                            {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
+                }
+            }
+        }
         _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
         _cv.notify_all();
     }
@@ -979,12 +982,15 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
     UniqueId queryid = request->query_id();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
-    auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
-    if (bf_size_for_cur_query != _bf_size_map.end()) {
-        for (auto& iter : bf_size_for_cur_query->second) {
-            auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
-            DCHECK(bf != nullptr);
-            bf->init_with_fixed_length(iter.second);
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
+        if (bf_size_for_cur_query != _bf_size_map.end()) {
+            for (auto& iter : bf_size_for_cur_query->second) {
+                auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
+                DCHECK(bf != nullptr);
+                bf->init_with_fixed_length(iter.second);
+            }
         }
     }
     RETURN_IF_ERROR(filter_controller->merge(request, attach_data));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org