You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 06:19:40 UTC
[doris] branch master updated: [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a8535e91af [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)
a8535e91af is described below
commit a8535e91af5b7ee69e8a2f283b5d82852175e5d1
Author: Gabriel <ga...@gmail.com>
AuthorDate: Mon Oct 10 14:19:33 2022 +0800
[Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)
---
be/src/exprs/bloomfilter_predicate.h | 13 +++++++++++--
be/src/exprs/runtime_filter.cpp | 31 +++++++++++++++++++++++++------
be/src/exprs/runtime_filter.h | 5 ++++-
be/src/runtime/fragment_mgr.cpp | 33 +++++++++++++++++++++++++++++++++
be/src/runtime/fragment_mgr.h | 1 +
be/src/runtime/runtime_filter_mgr.h | 16 ++++++++++------
6 files changed, 84 insertions(+), 15 deletions(-)
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 2540dc657d..ddc9d8a884 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -101,7 +101,13 @@ public:
}
Status init_with_fixed_length(int64_t bloom_filter_length) {
- DCHECK(!_inited);
+ if (_inited) {
+ return Status::OK();
+ }
+ std::lock_guard<std::mutex> l(_lock);
+ if (_inited) {
+ return Status::OK();
+ }
DCHECK(bloom_filter_length >= 0);
DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0);
_bloom_filter_alloced = bloom_filter_length;
@@ -117,7 +123,9 @@ public:
_bloom_filter.reset(BloomFilterAdaptor::create());
}
if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
- LOG(WARNING) << "bloom filter size not the same";
+ LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
+ << _bloom_filter_alloced
+ << ", expected allocated bytes = " << other_func->_bloom_filter_alloced;
return Status::InvalidArgument("bloom filter size invalid");
}
return _bloom_filter->merge(other_func->_bloom_filter.get());
@@ -166,6 +174,7 @@ protected:
int32_t _bloom_filter_alloced;
std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
bool _inited;
+ std::mutex _lock;
};
template <class T>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1311f2b928..24e8e41a84 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -418,7 +418,7 @@ public:
_filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
- Status init(const RuntimeFilterParams* params) {
+ Status init(const RuntimeFilterParams* params, bool init_bloom_filter) {
_max_in_num = params->max_in_num;
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
@@ -432,12 +432,20 @@ public:
case RuntimeFilterType::BLOOM_FILTER: {
_is_bloomfilter = true;
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ if (init_bloom_filter) {
+ return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ } else {
+ return Status::OK();
+ }
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
_hybrid_set.reset(create_set(_column_return_type));
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ if (init_bloom_filter) {
+ return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ } else {
+ return Status::OK();
+ }
}
default:
return Status::InvalidArgument("Unknown Filter type");
@@ -461,6 +469,12 @@ public:
}
}
+ BloomFilterFuncBase* get_bloomfilter() const {
+ DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
+ _filter_type == RuntimeFilterType::BLOOM_FILTER);
+ return _bloomfilter_func.get();
+ }
+
void insert(const void* data) {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
@@ -1059,7 +1073,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
*res = pool->add(new IRuntimeFilter(state, pool));
(*res)->set_role(role);
UniqueId fragment_instance_id(state->fragment_instance_id());
- return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
+ return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id);
}
void IRuntimeFilter::insert(const void* data) {
@@ -1186,8 +1200,13 @@ void IRuntimeFilter::signal() {
_effect_timer.reset();
}
+BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
+ return _wrapper->get_bloomfilter();
+}
+
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
- UniqueId fragment_instance_id, int node_id) {
+ UniqueId fragment_instance_id, bool init_bloom_filter,
+ int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
@@ -1235,7 +1254,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
- return _wrapper->init(¶ms);
+ return _wrapper->init(¶ms, init_bloom_filter);
}
Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 60162b98f0..6756e2e70f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -43,6 +43,7 @@ class PInFilter;
class PMinMaxFilter;
class HashJoinNode;
class RuntimeProfile;
+class BloomFilterFuncBase;
namespace vectorized {
class VExpr;
@@ -190,7 +191,9 @@ public:
// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
- UniqueId fragment_id = UniqueId(0, 0), int node_id = -1);
+ UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1);
+
+ BloomFilterFuncBase* get_bloomfilter() const;
// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b5c3af432..0471b63d71 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -27,6 +27,7 @@
#include "common/object_pool.h"
#include "common/resource_tls.h"
#include "common/signal_handler.h"
+#include "exprs/bloomfilter_predicate.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService.h"
@@ -425,6 +426,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
_fragments_ctx_map(),
+ _bf_size_map(),
_stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
@@ -467,6 +469,7 @@ FragmentMgr::~FragmentMgr() {
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_fragments_ctx_map.clear();
+ _bf_size_map.clear();
}
}
@@ -508,6 +511,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
_fragment_map.erase(exec_state->fragment_instance_id());
if (all_done && fragments_ctx) {
_fragments_ctx_map.erase(fragments_ctx->query_id);
+ _bf_size_map.erase(fragments_ctx->query_id);
}
}
@@ -666,6 +670,26 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
_runtimefilter_controller.add_entity(params, &handler);
exec_state->set_merge_controller_handler(handler);
+ auto& runtime_filter_params = params.params.runtime_filter_params;
+ if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
+ _bf_size_map.insert({fragments_ctx->query_id, {}});
+ }
+ for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+ int filter_id = filterid_to_desc.first;
+ const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+ if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+ continue;
+ }
+ const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+ continue;
+ }
+ if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
+ _bf_size_map[fragments_ctx->query_id].insert(
+ {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
+ }
+ }
+
RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
@@ -683,6 +707,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
+ _bf_size_map.erase(fragments_ctx->query_id);
}
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
@@ -954,6 +979,14 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
+ auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
+ if (bf_size_for_cur_query != _bf_size_map.end()) {
+ for (auto& iter : bf_size_for_cur_query->second) {
+ auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
+ DCHECK(bf != nullptr);
+ bf->init_with_fixed_length(iter.second);
+ }
+ }
RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
return Status::OK();
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 66b06540d4..fd54afacd2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -112,6 +112,7 @@ private:
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;
// query id -> QueryFragmentsCtx
std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map;
+ std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _cancel_thread;
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 1dbb75471e..0db00f2331 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -117,12 +117,6 @@ public:
UniqueId query_id() { return _query_id; }
-private:
- Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
- const TQueryOptions* query_options,
- const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
- const int producer_size);
-
struct RuntimeFilterCntlVal {
int64_t create_time;
int producer_size;
@@ -132,6 +126,16 @@ private:
std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
std::shared_ptr<ObjectPool> pool;
};
+
+public:
+ RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[std::to_string(id)].get(); }
+
+private:
+ Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+ const TQueryOptions* query_options,
+ const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+ const int producer_size);
+
UniqueId _query_id;
UniqueId _fragment_instance_id;
// protect _filter_map
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org