You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "zclllyybb (via GitHub)" <gi...@apache.org> on 2023/07/12 14:18:03 UTC

[GitHub] [doris] zclllyybb commented on a diff in pull request #21715: [imporve](bloomfilter) refactor runtime_filter_mgr with bloomfilter

zclllyybb commented on code in PR #21715:
URL: https://github.com/apache/doris/pull/21715#discussion_r1261218094


##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -208,7 +209,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
         const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
         const int producer_size) {
-    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);

Review Comment:
   这个可以只锁emplace



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -312,34 +313,37 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
     int64_t merge_time = 0;
+    int64_t start_merge = MonotonicMillis();
+    auto filter_id = request->filter_id();
+    std::map<int, CntlValwithLock>::iterator iter;
     {
-        int64_t start_merge = MonotonicMillis();
-        std::lock_guard<std::mutex> guard(_filter_map_mutex);
-        auto iter = _filter_map.find(std::to_string(request->filter_id()));
+        std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+        iter = _filter_map.find(filter_id);
         VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
         if (iter == _filter_map.end()) {
             return Status::InvalidArgument("unknown filter id {}",
                                            std::to_string(request->filter_id()));
         }
-        cntVal = iter->second;
-        if (auto bf = cntVal->filter->get_bloomfilter()) {
-            RETURN_IF_ERROR(bf->init_with_fixed_length());
-        }
+    }
+    // iter->second = pair{CntlVal,SpinLock}
+    cntVal = iter->second.first;
+    {
+        std::lock_guard<SpinLock> l(*iter->second.second);
         MergeRuntimeFilterParams params(request, attach_data);
-        ObjectPool* pool = iter->second->pool.get();
+        ObjectPool* pool = cntVal->pool.get();
         RuntimeFilterWrapperHolder holder;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, &params, pool, holder.getHandle()));
         RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));

Review Comment:
   here the `holder` 



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -208,7 +209,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
         const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
         const int producer_size) {
-    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);

Review Comment:
   这个可以只锁emplace



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -219,18 +220,18 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cntVal->filter =
             cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
 
-    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+    auto filter_id = runtime_filter_desc->filter_id;
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false);
-    _filter_map.emplace(filter_id, cntVal);
+    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
     return Status::OK();
 }
 
 Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
         const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
         const int producer_size) {
-    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);

Review Comment:
   同上



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -219,18 +220,18 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cntVal->filter =
             cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool));
 
-    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+    auto filter_id = runtime_filter_desc->filter_id;
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false);
-    _filter_map.emplace(filter_id, cntVal);
+    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, std::make_unique<SpinLock>()});
     return Status::OK();
 }
 
 Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options,
         const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
         const int producer_size) {
-    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);

Review Comment:
   同上



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org