You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/08 08:02:42 UTC
[doris] 01/03: (runtimefilter) shorter time prepare consumes (#13127)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 65247d03b081f1b2618efab18c6b262fc28850fe
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Thu Oct 6 10:12:29 2022 +0800
(runtimefilter) shorter time prepare consumes (#13127)
Now, every preare put a runtime filter controller, so it takes the
mutex lock on the controller map. Init of bloom filter takes some
time in allocate and memset. If we run p1 tests with -parallel=20
-suiteParallel=20 -actionParallel=20, then we get error message like
'send fragment timeout 5s'.
The patch fixes the problem in the following 2 ways:
1. Replace one mutex block with 128s.
2. If a plan fragment does not have a runtime filter, it does not need to take
the locks.
---
be/src/runtime/runtime_filter_mgr.cpp | 32 ++++++++++++++++++++------------
be/src/runtime/runtime_filter_mgr.h | 15 +++++++++++----
2 files changed, 31 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index b5302aeace..ffea62a155 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -272,48 +272,56 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
Status RuntimeFilterMergeController::add_entity(
const TExecPlanFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+ if (!params.params.__isset.runtime_filter_params ||
+ params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
+ return Status::OK();
+ }
+
runtime_filter_merge_entity_closer entity_closer =
std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1);
- std::lock_guard<std::mutex> guard(_controller_mutex);
UniqueId query_id(params.params.query_id);
std::string query_id_str = query_id.to_string();
- auto iter = _filter_controller_map.find(query_id_str);
UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id);
+ uint32_t shard = _get_controller_shard_idx(query_id);
+ std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+ auto iter = _filter_controller_map[shard].find(query_id_str);
- if (iter == _filter_controller_map.end()) {
+ if (iter == _filter_controller_map[shard].end()) {
*handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
new RuntimeFilterMergeControllerEntity(), entity_closer);
- _filter_controller_map[query_id_str] = *handle;
+ _filter_controller_map[shard][query_id_str] = *handle;
const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params;
if (params.params.__isset.runtime_filter_params) {
RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, params.query_options));
}
} else {
- *handle = _filter_controller_map[query_id_str].lock();
+ *handle = _filter_controller_map[shard][query_id_str].lock();
}
return Status::OK();
}
Status RuntimeFilterMergeController::acquire(
UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
- std::lock_guard<std::mutex> guard(_controller_mutex);
+ uint32_t shard = _get_controller_shard_idx(query_id);
+ std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
std::string query_id_str = query_id.to_string();
- auto iter = _filter_controller_map.find(query_id_str);
- if (iter == _filter_controller_map.end()) {
+ auto iter = _filter_controller_map[shard].find(query_id_str);
+ if (iter == _filter_controller_map[shard].end()) {
LOG(WARNING) << "not found entity, query-id:" << query_id_str;
return Status::InvalidArgument("not found entity");
}
- *handle = _filter_controller_map[query_id_str].lock();
+ *handle = _filter_controller_map[shard][query_id_str].lock();
if (*handle == nullptr) {
return Status::InvalidArgument("entity is closed");
}
return Status::OK();
}
-Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
- std::lock_guard<std::mutex> guard(_controller_mutex);
- _filter_controller_map.erase(queryId.to_string());
+Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
+ uint32_t shard = _get_controller_shard_idx(query_id);
+ std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+ _filter_controller_map[shard].erase(query_id.to_string());
return Status::OK();
}
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 653ce675b2..346a3cbab8 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -22,6 +22,7 @@
#include <memory>
#include <mutex>
#include <thread>
+#include <unordered_map>
#include "common/object_pool.h"
#include "common/status.h"
@@ -163,16 +164,22 @@ public:
// thread safe
// remove a entity by query-id
// remove_entity will be called automatically by entity when entity is destroyed
- Status remove_entity(UniqueId queryId);
+ Status remove_entity(UniqueId query_id);
+
+ static const int kShardNum = 128;
private:
- std::mutex _controller_mutex;
+ uint32_t _get_controller_shard_idx(UniqueId& query_id) {
+ return (uint32_t)query_id.hi % kShardNum;
+ }
+
+ std::mutex _controller_mutex[kShardNum];
// We store the weak pointer here.
// When the external object is destroyed, we need to clear this record
using FilterControllerMap =
- std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+ std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
// str(query-id) -> entity
- FilterControllerMap _filter_controller_map;
+ FilterControllerMap _filter_controller_map[kShardNum];
};
using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org