You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/08 08:02:42 UTC

[doris] 01/03: (runtimefilter) shorter time prepare consumes (#13127)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 65247d03b081f1b2618efab18c6b262fc28850fe
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Thu Oct 6 10:12:29 2022 +0800

    (runtimefilter) shorter time prepare consumes (#13127)
    
    Now, every preare put a runtime filter controller, so it takes the
    mutex lock on the controller map. Init of bloom filter takes some
    time in allocate and memset. If we run p1 tests with -parallel=20
    -suiteParallel=20 -actionParallel=20, then we get error message like
    'send fragment timeout 5s'.
    
    The patch fixes the problem in the following 2 ways:
    1. Replace one mutex block with 128s.
    2. If a plan fragment does not have a runtime filter, it does not need to take
    the locks.
---
 be/src/runtime/runtime_filter_mgr.cpp | 32 ++++++++++++++++++++------------
 be/src/runtime/runtime_filter_mgr.h   | 15 +++++++++++----
 2 files changed, 31 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index b5302aeace..ffea62a155 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -272,48 +272,56 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
 Status RuntimeFilterMergeController::add_entity(
         const TExecPlanFragmentParams& params,
         std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+    if (!params.params.__isset.runtime_filter_params ||
+        params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
+        return Status::OK();
+    }
+
     runtime_filter_merge_entity_closer entity_closer =
             std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1);
 
-    std::lock_guard<std::mutex> guard(_controller_mutex);
     UniqueId query_id(params.params.query_id);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
     UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    auto iter = _filter_controller_map[shard].find(query_id_str);
 
-    if (iter == _filter_controller_map.end()) {
+    if (iter == _filter_controller_map[shard].end()) {
         *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
                 new RuntimeFilterMergeControllerEntity(), entity_closer);
-        _filter_controller_map[query_id_str] = *handle;
+        _filter_controller_map[shard][query_id_str] = *handle;
         const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params;
         if (params.params.__isset.runtime_filter_params) {
             RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, params.query_options));
         }
     } else {
-        *handle = _filter_controller_map[query_id_str].lock();
+        *handle = _filter_controller_map[shard][query_id_str].lock();
     }
     return Status::OK();
 }
 
 Status RuntimeFilterMergeController::acquire(
         UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
-    if (iter == _filter_controller_map.end()) {
+    auto iter = _filter_controller_map[shard].find(query_id_str);
+    if (iter == _filter_controller_map[shard].end()) {
         LOG(WARNING) << "not found entity, query-id:" << query_id_str;
         return Status::InvalidArgument("not found entity");
     }
-    *handle = _filter_controller_map[query_id_str].lock();
+    *handle = _filter_controller_map[shard][query_id_str].lock();
     if (*handle == nullptr) {
         return Status::InvalidArgument("entity is closed");
     }
     return Status::OK();
 }
 
-Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
-    _filter_controller_map.erase(queryId.to_string());
+Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    _filter_controller_map[shard].erase(query_id.to_string());
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 653ce675b2..346a3cbab8 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <unordered_map>
 
 #include "common/object_pool.h"
 #include "common/status.h"
@@ -163,16 +164,22 @@ public:
     // thread safe
     // remove a entity by query-id
     // remove_entity will be called automatically by entity when entity is destroyed
-    Status remove_entity(UniqueId queryId);
+    Status remove_entity(UniqueId query_id);
+
+    static const int kShardNum = 128;
 
 private:
-    std::mutex _controller_mutex;
+    uint32_t _get_controller_shard_idx(UniqueId& query_id) {
+        return (uint32_t)query_id.hi % kShardNum;
+    }
+
+    std::mutex _controller_mutex[kShardNum];
     // We store the weak pointer here.
     // When the external object is destroyed, we need to clear this record
     using FilterControllerMap =
-            std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+            std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
     // str(query-id) -> entity
-    FilterControllerMap _filter_controller_map;
+    FilterControllerMap _filter_controller_map[kShardNum];
 };
 
 using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org