You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/10 06:19:40 UTC

[doris] branch master updated: [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a8535e91af [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)
a8535e91af is described below

commit a8535e91af5b7ee69e8a2f283b5d82852175e5d1
Author: Gabriel <ga...@gmail.com>
AuthorDate: Mon Oct 10 14:19:33 2022 +0800

    [Improvement](runtimefilter) DO NOT allocate memory for bbf in prepare phase (#13207)
---
 be/src/exprs/bloomfilter_predicate.h | 13 +++++++++++--
 be/src/exprs/runtime_filter.cpp      | 31 +++++++++++++++++++++++++------
 be/src/exprs/runtime_filter.h        |  5 ++++-
 be/src/runtime/fragment_mgr.cpp      | 33 +++++++++++++++++++++++++++++++++
 be/src/runtime/fragment_mgr.h        |  1 +
 be/src/runtime/runtime_filter_mgr.h  | 16 ++++++++++------
 6 files changed, 84 insertions(+), 15 deletions(-)

diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 2540dc657d..ddc9d8a884 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -101,7 +101,13 @@ public:
     }
 
     Status init_with_fixed_length(int64_t bloom_filter_length) {
-        DCHECK(!_inited);
+        if (_inited) {
+            return Status::OK();
+        }
+        std::lock_guard<std::mutex> l(_lock);
+        if (_inited) {
+            return Status::OK();
+        }
         DCHECK(bloom_filter_length >= 0);
         DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0);
         _bloom_filter_alloced = bloom_filter_length;
@@ -117,7 +123,9 @@ public:
             _bloom_filter.reset(BloomFilterAdaptor::create());
         }
         if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
-            LOG(WARNING) << "bloom filter size not the same";
+            LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
+                         << _bloom_filter_alloced
+                         << ", expected allocated bytes = " << other_func->_bloom_filter_alloced;
             return Status::InvalidArgument("bloom filter size invalid");
         }
         return _bloom_filter->merge(other_func->_bloom_filter.get());
@@ -166,6 +174,7 @@ protected:
     int32_t _bloom_filter_alloced;
     std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
     bool _inited;
+    std::mutex _lock;
 };
 
 template <class T>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1311f2b928..24e8e41a84 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -418,7 +418,7 @@ public:
               _filter_id(filter_id) {}
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
-    Status init(const RuntimeFilterParams* params) {
+    Status init(const RuntimeFilterParams* params, bool init_bloom_filter) {
         _max_in_num = params->max_in_num;
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
@@ -432,12 +432,20 @@ public:
         case RuntimeFilterType::BLOOM_FILTER: {
             _is_bloomfilter = true;
             _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
-            return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+            if (init_bloom_filter) {
+                return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+            } else {
+                return Status::OK();
+            }
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
             _hybrid_set.reset(create_set(_column_return_type));
             _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
-            return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+            if (init_bloom_filter) {
+                return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+            } else {
+                return Status::OK();
+            }
         }
         default:
             return Status::InvalidArgument("Unknown Filter type");
@@ -461,6 +469,12 @@ public:
         }
     }
 
+    BloomFilterFuncBase* get_bloomfilter() const {
+        DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
+               _filter_type == RuntimeFilterType::BLOOM_FILTER);
+        return _bloomfilter_func.get();
+    }
+
     void insert(const void* data) {
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
@@ -1059,7 +1073,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
     *res = pool->add(new IRuntimeFilter(state, pool));
     (*res)->set_role(role);
     UniqueId fragment_instance_id(state->fragment_instance_id());
-    return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
+    return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id);
 }
 
 void IRuntimeFilter::insert(const void* data) {
@@ -1186,8 +1200,13 @@ void IRuntimeFilter::signal() {
     _effect_timer.reset();
 }
 
+BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
+    return _wrapper->get_bloomfilter();
+}
+
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
-                                      UniqueId fragment_instance_id, int node_id) {
+                                      UniqueId fragment_instance_id, bool init_bloom_filter,
+                                      int node_id) {
     // if node_id == -1 , it shouldn't be a consumer
     DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
 
@@ -1235,7 +1254,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
     }
 
     _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
-    return _wrapper->init(&params);
+    return _wrapper->init(&params, init_bloom_filter);
 }
 
 Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 60162b98f0..6756e2e70f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -43,6 +43,7 @@ class PInFilter;
 class PMinMaxFilter;
 class HashJoinNode;
 class RuntimeProfile;
+class BloomFilterFuncBase;
 
 namespace vectorized {
 class VExpr;
@@ -190,7 +191,9 @@ public:
 
     // init filter with desc
     Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
-                          UniqueId fragment_id = UniqueId(0, 0), int node_id = -1);
+                          UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1);
+
+    BloomFilterFuncBase* get_bloomfilter() const;
 
     // serialize _wrapper to protobuf
     Status serialize(PMergeFilterRequest* request, void** data, int* len);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b5c3af432..0471b63d71 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -27,6 +27,7 @@
 #include "common/object_pool.h"
 #include "common/resource_tls.h"
 #include "common/signal_handler.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "gen_cpp/DataSinks_types.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/HeartbeatService.h"
@@ -425,6 +426,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env),
           _fragment_map(),
           _fragments_ctx_map(),
+          _bf_size_map(),
           _stop_background_threads_latch(1) {
     _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
     INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
@@ -467,6 +469,7 @@ FragmentMgr::~FragmentMgr() {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.clear();
         _fragments_ctx_map.clear();
+        _bf_size_map.clear();
     }
 }
 
@@ -508,6 +511,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
         _fragment_map.erase(exec_state->fragment_instance_id());
         if (all_done && fragments_ctx) {
             _fragments_ctx_map.erase(fragments_ctx->query_id);
+            _bf_size_map.erase(fragments_ctx->query_id);
         }
     }
 
@@ -666,6 +670,26 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
     _runtimefilter_controller.add_entity(params, &handler);
     exec_state->set_merge_controller_handler(handler);
 
+    auto& runtime_filter_params = params.params.runtime_filter_params;
+    if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
+        _bf_size_map.insert({fragments_ctx->query_id, {}});
+    }
+    for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+        int filter_id = filterid_to_desc.first;
+        const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+        if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+            continue;
+        }
+        const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+        if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+            continue;
+        }
+        if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
+            _bf_size_map[fragments_ctx->query_id].insert(
+                    {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
+        }
+    }
+
     RETURN_IF_ERROR(exec_state->prepare(params));
     {
         std::lock_guard<std::mutex> lock(_lock);
@@ -683,6 +707,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             // Remove the exec state added
             std::lock_guard<std::mutex> lock(_lock);
             _fragment_map.erase(params.params.fragment_instance_id);
+            _bf_size_map.erase(fragments_ctx->query_id);
         }
         exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                            "push plan fragment to thread pool failed");
@@ -954,6 +979,14 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
     UniqueId queryid = request->query_id();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
+    auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
+    if (bf_size_for_cur_query != _bf_size_map.end()) {
+        for (auto& iter : bf_size_for_cur_query->second) {
+            auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
+            DCHECK(bf != nullptr);
+            bf->init_with_fixed_length(iter.second);
+        }
+    }
     RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
     return Status::OK();
 }
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 66b06540d4..fd54afacd2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -112,6 +112,7 @@ private:
     std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;
     // query id -> QueryFragmentsCtx
     std::unordered_map<TUniqueId, std::shared_ptr<QueryFragmentsCtx>> _fragments_ctx_map;
+    std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _cancel_thread;
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 1dbb75471e..0db00f2331 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -117,12 +117,6 @@ public:
 
     UniqueId query_id() { return _query_id; }
 
-private:
-    Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
-                           const TQueryOptions* query_options,
-                           const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
-                           const int producer_size);
-
     struct RuntimeFilterCntlVal {
         int64_t create_time;
         int producer_size;
@@ -132,6 +126,16 @@ private:
         std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
         std::shared_ptr<ObjectPool> pool;
     };
+
+public:
+    RuntimeFilterCntlVal* get_filter(int id) { return _filter_map[std::to_string(id)].get(); }
+
+private:
+    Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+                           const TQueryOptions* query_options,
+                           const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+                           const int producer_size);
+
     UniqueId _query_id;
     UniqueId _fragment_instance_id;
     // protect _filter_map


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org