You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/21 09:37:32 UTC

[doris] branch master updated: [Improvement](bloomfilter) allocate memory for BF in open phase (#13494)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3006b258b0 [Improvement](bloomfilter) allocate memory for BF in open phase (#13494)
3006b258b0 is described below

commit 3006b258b058982626d57521948ed2ac98458af1
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Oct 21 17:37:26 2022 +0800

    [Improvement](bloomfilter) allocate memory for BF in open phase (#13494)
---
 be/src/exec/hash_join_node.cpp           | 36 ++++++++++++++---------------
 be/src/exec/hash_join_node.h             |  6 ++---
 be/src/exec/olap_scan_node.cpp           |  3 +++
 be/src/exprs/bloomfilter_predicate.h     | 37 ++++++++++++++++++++++--------
 be/src/exprs/runtime_filter.cpp          | 29 ++++++++----------------
 be/src/exprs/runtime_filter.h            |  2 +-
 be/src/runtime/fragment_mgr.cpp          | 39 --------------------------------
 be/src/runtime/runtime_filter_mgr.cpp    |  4 ++++
 be/src/vec/exec/join/vhash_join_node.cpp | 22 ++++++++++++------
 be/src/vec/exec/join/vhash_join_node.h   |  5 +++-
 be/src/vec/exec/scan/vscan_node.cpp      |  3 +++
 be/src/vec/exec/volap_scan_node.cpp      |  3 +++
 be/test/exprs/runtime_filter_test.cpp    |  6 +++++
 13 files changed, 96 insertions(+), 99 deletions(-)

diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 53efed1113..7e9a2bf989 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -24,6 +24,7 @@
 
 #include "common/utils.h"
 #include "exec/hash_table.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "exprs/runtime_filter.h"
@@ -88,9 +89,13 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _build_unique = false;
     }
 
-    for (const auto& filter_desc : _runtime_filter_descs) {
+    _runtime_filters.resize(_runtime_filter_descs.size());
+
+    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
-                RuntimeFilterRole::PRODUCER, filter_desc, state->query_options()));
+                RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
+                _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
     }
 
     return Status::OK();
@@ -177,10 +182,10 @@ Status HashJoinNode::close(RuntimeState* state) {
     return ExecNode::close(state);
 }
 
-void HashJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) {
+void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
     SCOPED_ATTACH_TASK(state);
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-    status->set_value(construct_hash_table(state));
+    status->set_value(child(0)->open(state));
 }
 
 Status HashJoinNode::construct_hash_table(RuntimeState* state) {
@@ -218,6 +223,11 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
 }
 
 Status HashJoinNode::open(RuntimeState* state) {
+    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
+        if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
+    }
     RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
@@ -235,13 +245,13 @@ Status HashJoinNode::open(RuntimeState* state) {
     // main thread
     std::promise<Status> thread_status;
     add_runtime_exec_option("Hash Table Built Asynchronously");
-    std::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status)).detach();
+    std::thread(bind(&HashJoinNode::probe_side_open_thread, this, state, &thread_status)).detach();
 
     if (!_runtime_filter_descs.empty()) {
         RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
                                                 _runtime_filter_descs);
 
-        RETURN_IF_ERROR(thread_status.get_future().get());
+        RETURN_IF_ERROR(construct_hash_table(state));
         RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size()));
         {
             SCOPED_TIMER(_push_compute_timer);
@@ -253,23 +263,13 @@ Status HashJoinNode::open(RuntimeState* state) {
             SCOPED_TIMER(_push_down_timer);
             runtime_filter_slots.publish();
         }
-        Status open_status = child(0)->open(state);
-        RETURN_IF_ERROR(open_status);
+        RETURN_IF_ERROR(thread_status.get_future().get());
     } else {
-        // Open the probe-side child so that it may perform any initialisation in parallel.
-        // Don't exit even if we see an error, we still need to wait for the build thread
-        // to finish.
-        Status open_status = child(0)->open(state);
-
         // Blocks until ConstructHashTable has returned, after which
         // the hash table is fully constructed and we can start the probe
         // phase.
         RETURN_IF_ERROR(thread_status.get_future().get());
-
-        // ISSUE-1247, check open_status after buildThread execute.
-        // If this return first, build thread will use 'thread_status'
-        // which is already destructor and then coredump.
-        RETURN_IF_ERROR(open_status);
+        RETURN_IF_ERROR(construct_hash_table(state));
     }
 
     // seed probe batch and _current_probe_row, etc.
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index 506ac07b9e..afb898b991 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -34,6 +34,7 @@ namespace doris {
 class MemPool;
 class RowBatch;
 class TupleRow;
+class IRuntimeFilter;
 
 // Node for in-memory hash joins:
 // - builds up a hash table with the rows produced by our right input
@@ -139,9 +140,7 @@ private:
     RuntimeProfile::Counter* _hash_table_list_min_size;
     RuntimeProfile::Counter* _hash_table_list_max_size;
 
-    // Supervises ConstructHashTable in a separate thread, and
-    // returns its status in the promise parameter.
-    void build_side_thread(RuntimeState* state, std::promise<Status>* status);
+    void probe_side_open_thread(RuntimeState* state, std::promise<Status>* status);
 
     // We parallelise building the build-side with Open'ing the
     // probe-side. If, for example, the probe-side child is another
@@ -177,6 +176,7 @@ private:
     std::string get_probe_row_output_string(TupleRow* probe_row);
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<IRuntimeFilter*> _runtime_filters;
 };
 
 } // namespace doris
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index d3b3a3aabd..ef4f2236c5 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -244,6 +244,9 @@ Status OlapScanNode::open(RuntimeState* state) {
         IRuntimeFilter* runtime_filter = nullptr;
         state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
         DCHECK(runtime_filter != nullptr);
+        if (auto bf = runtime_filter->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
         if (runtime_filter == nullptr) {
             continue;
         }
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index cd1e89f465..ab07fd8e1b 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cmath>
 #include <cstdint>
+#include <future>
 #include <memory>
 #include <string>
 #include <type_traits>
@@ -100,6 +101,10 @@ public:
         return init_with_fixed_length(filter_size);
     }
 
+    void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; }
+
+    Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); }
+
     Status init_with_fixed_length(int64_t bloom_filter_length) {
         if (_inited) {
             return Status::OK();
@@ -118,17 +123,28 @@ public:
     }
 
     Status merge(BloomFilterFuncBase* bloomfilter_func) {
-        auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
-        if (bloomfilter_func == nullptr) {
-            _bloom_filter.reset(BloomFilterAdaptor::create());
-        }
-        if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
-            LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
-                         << _bloom_filter_alloced
-                         << ", expected allocated bytes = " << other_func->_bloom_filter_alloced;
-            return Status::InvalidArgument("bloom filter size invalid");
+        // If `_inited` is false, there is no memory allocated in bloom filter and this is the first
+        // call for `merge` function. So we just reuse this bloom filter, and we don't need to
+        // allocate memory again.
+        if (!_inited) {
+            auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+            DCHECK(_bloom_filter == nullptr);
+            DCHECK(bloomfilter_func != nullptr);
+            _bloom_filter = bloomfilter_func->_bloom_filter;
+            _bloom_filter_alloced = other_func->_bloom_filter_alloced;
+            _inited = true;
+            return Status::OK();
+        } else {
+            DCHECK(bloomfilter_func != nullptr);
+            auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+            if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
+                LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
+                             << _bloom_filter_alloced << ", expected allocated bytes = "
+                             << other_func->_bloom_filter_alloced;
+                return Status::InvalidArgument("bloom filter size invalid");
+            }
+            return _bloom_filter->merge(other_func->_bloom_filter.get());
         }
-        return _bloom_filter->merge(other_func->_bloom_filter.get());
     }
 
     Status assign(const char* data, int len) {
@@ -175,6 +191,7 @@ protected:
     std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
     bool _inited;
     std::mutex _lock;
+    int64_t _bloom_filter_length;
 };
 
 template <class T>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 540e814633..1b5ca935e5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -418,7 +418,7 @@ public:
               _filter_id(filter_id) {}
     // init runtime filter wrapper
     // alloc memory to init runtime filter function
-    Status init(const RuntimeFilterParams* params, bool init_bloom_filter) {
+    Status init(const RuntimeFilterParams* params) {
         _max_in_num = params->max_in_num;
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
@@ -432,20 +432,14 @@ public:
         case RuntimeFilterType::BLOOM_FILTER: {
             _is_bloomfilter = true;
             _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
-            if (init_bloom_filter) {
-                return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
-            } else {
-                return Status::OK();
-            }
+            _bloomfilter_func->set_length(params->bloom_filter_size);
+            return Status::OK();
         }
         case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
             _hybrid_set.reset(create_set(_column_return_type));
             _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
-            if (init_bloom_filter) {
-                return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
-            } else {
-                return Status::OK();
-            }
+            _bloomfilter_func->set_length(params->bloom_filter_size);
+            return Status::OK();
         }
         default:
             return Status::InvalidArgument("Unknown Filter type");
@@ -469,11 +463,7 @@ public:
         }
     }
 
-    BloomFilterFuncBase* get_bloomfilter() const {
-        DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
-               _filter_type == RuntimeFilterType::BLOOM_FILTER);
-        return _bloomfilter_func.get();
-    }
+    BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); }
 
     void insert(const void* data) {
         switch (_filter_type) {
@@ -1073,7 +1063,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
     *res = pool->add(new IRuntimeFilter(state, pool));
     (*res)->set_role(role);
     UniqueId fragment_instance_id(state->fragment_instance_id());
-    return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id);
+    return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
 }
 
 void IRuntimeFilter::insert(const void* data) {
@@ -1205,8 +1195,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
 }
 
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
-                                      UniqueId fragment_instance_id, bool init_bloom_filter,
-                                      int node_id) {
+                                      UniqueId fragment_instance_id, int node_id) {
     // if node_id == -1 , it shouldn't be a consumer
     DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
 
@@ -1254,7 +1243,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
     }
 
     _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
-    return _wrapper->init(&params, init_bloom_filter);
+    return _wrapper->init(&params);
 }
 
 Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 6756e2e70f..677321ea4d 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -191,7 +191,7 @@ public:
 
     // init filter with desc
     Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
-                          UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1);
+                          UniqueId fragment_id, int node_id = -1);
 
     BloomFilterFuncBase* get_bloomfilter() const;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b18e80df3e..7b5c3af432 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -27,7 +27,6 @@
 #include "common/object_pool.h"
 #include "common/resource_tls.h"
 #include "common/signal_handler.h"
-#include "exprs/bloomfilter_predicate.h"
 #include "gen_cpp/DataSinks_types.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/HeartbeatService.h"
@@ -426,7 +425,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         : _exec_env(exec_env),
           _fragment_map(),
           _fragments_ctx_map(),
-          _bf_size_map(),
           _stop_background_threads_latch(1) {
     _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
     INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
@@ -469,7 +467,6 @@ FragmentMgr::~FragmentMgr() {
         std::lock_guard<std::mutex> lock(_lock);
         _fragment_map.clear();
         _fragments_ctx_map.clear();
-        _bf_size_map.clear();
     }
 }
 
@@ -511,7 +508,6 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
         _fragment_map.erase(exec_state->fragment_instance_id());
         if (all_done && fragments_ctx) {
             _fragments_ctx_map.erase(fragments_ctx->query_id);
-            _bf_size_map.erase(fragments_ctx->query_id);
         }
     }
 
@@ -673,29 +669,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
     RETURN_IF_ERROR(exec_state->prepare(params));
     {
         std::lock_guard<std::mutex> lock(_lock);
-        auto& runtime_filter_params = params.params.runtime_filter_params;
-        if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
-            auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id);
-            if (bf_size_for_cur_query == _bf_size_map.end()) {
-                _bf_size_map.insert({fragments_ctx->query_id, {}});
-            }
-            for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
-                int filter_id = filterid_to_desc.first;
-                const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
-                if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
-                    continue;
-                }
-                const auto& build_iter =
-                        runtime_filter_params.runtime_filter_builder_num.find(filter_id);
-                if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
-                    continue;
-                }
-                if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
-                    _bf_size_map[fragments_ctx->query_id].insert(
-                            {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
-                }
-            }
-        }
         _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
         _cv.notify_all();
     }
@@ -710,7 +683,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             // Remove the exec state added
             std::lock_guard<std::mutex> lock(_lock);
             _fragment_map.erase(params.params.fragment_instance_id);
-            _bf_size_map.erase(fragments_ctx->query_id);
         }
         exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                            "push plan fragment to thread pool failed");
@@ -982,17 +954,6 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
     UniqueId queryid = request->query_id();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
-        if (bf_size_for_cur_query != _bf_size_map.end()) {
-            for (auto& iter : bf_size_for_cur_query->second) {
-                auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
-                DCHECK(bf != nullptr);
-                bf->init_with_fixed_length(iter.second);
-            }
-        }
-    }
     RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
     return Status::OK();
 }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 73a9115a23..5175ed580c 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -20,6 +20,7 @@
 #include <string>
 
 #include "client_cache.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "exprs/runtime_filter.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/exec_env.h"
@@ -197,6 +198,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             return Status::InvalidArgument("unknown filter id");
         }
         cntVal = iter->second;
+        if (auto bf = cntVal->filter->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
         MergeRuntimeFilterParams params;
         params.data = data;
         params.request = request;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index fad0eb68cf..760db91927 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -898,9 +898,12 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _output_expr_ctxs.push_back(ctx);
     }
 
-    for (const auto& filter_desc : _runtime_filter_descs) {
+    _runtime_filters.resize(_runtime_filter_descs.size());
+    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
-                RuntimeFilterRole::PRODUCER, filter_desc, state->query_options()));
+                RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
+                _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
     }
 
     // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need
@@ -1198,6 +1201,11 @@ void HashJoinNode::_construct_mutable_join_block() {
 Status HashJoinNode::open(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
+        if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
+    }
     RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
@@ -1213,7 +1221,7 @@ Status HashJoinNode::open(RuntimeState* state) {
     std::thread([this, state, thread_status_p = &thread_status,
                  parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
         OpentelemetryScope scope {parent_span};
-        this->_hash_table_build_thread(state, thread_status_p);
+        this->_probe_side_open_thread(state, thread_status_p);
     }).detach();
 
     // Open the probe-side child so that it may perform any initialisation in parallel.
@@ -1222,16 +1230,16 @@ Status HashJoinNode::open(RuntimeState* state) {
     // ISSUE-1247, check open_status after buildThread execute.
     // If this return first, build thread will use 'thread_status'
     // which is already destructor and then coredump.
-    Status open_status = child(0)->open(state);
+    Status status = _hash_table_build(state);
     RETURN_IF_ERROR(thread_status.get_future().get());
-    return open_status;
+    return status;
 }
 
-void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<Status>* status) {
+void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread");
     SCOPED_ATTACH_TASK(state);
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-    status->set_value(_hash_table_build(state));
+    status->set_value(child(0)->open(state));
 }
 
 Status HashJoinNode::_hash_table_build(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 5de6b0be9c..5f84211110 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -389,7 +389,8 @@ private:
     MutableColumnPtr _tuple_is_null_left_flag_column;
     MutableColumnPtr _tuple_is_null_right_flag_column;
 
-    void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status);
+private:
+    void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* status);
 
     Status _hash_table_build(RuntimeState* state);
 
@@ -435,6 +436,8 @@ private:
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+
+    std::vector<IRuntimeFilter*> _runtime_filters;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 81a1c80d54..af193ddcf3 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -202,6 +202,9 @@ Status VScanNode::_acquire_runtime_filter() {
     std::vector<VExpr*> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+        if (auto bf = runtime_filter->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
         bool ready = runtime_filter->is_ready();
         if (!ready) {
             ready = runtime_filter->await();
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 8551a872da..f2f276cc70 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -253,6 +253,9 @@ Status VOlapScanNode::open(RuntimeState* state) {
     std::vector<VExpr*> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
+        if (auto bf = runtime_filter->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
         bool ready = runtime_filter->is_ready();
         if (!ready) {
             ready = runtime_filter->await();
diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp
index 6e69eb1d37..d4b66f295c 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -20,6 +20,7 @@
 #include <array>
 #include <memory>
 
+#include "exprs/bloomfilter_predicate.h"
 #include "exprs/expr_context.h"
 #include "exprs/slot_ref.h"
 #include "gen_cpp/Planner_types.h"
@@ -109,6 +110,11 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio
 
     EXPECT_TRUE(status.ok()) << status.to_string();
 
+    if (auto bf = runtime_filter->get_bloomfilter()) {
+        status = bf->init_with_fixed_length();
+        EXPECT_TRUE(status.ok()) << status.to_string();
+    }
+
     return status.ok() ? runtime_filter : nullptr;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org