You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/21 09:37:32 UTC
[doris] branch master updated: [Improvement](bloomfilter) allocate memory for BF in open phase (#13494)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3006b258b0 [Improvement](bloomfilter) allocate memory for BF in open phase (#13494)
3006b258b0 is described below
commit 3006b258b058982626d57521948ed2ac98458af1
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Oct 21 17:37:26 2022 +0800
[Improvement](bloomfilter) allocate memory for BF in open phase (#13494)
---
be/src/exec/hash_join_node.cpp | 36 ++++++++++++++---------------
be/src/exec/hash_join_node.h | 6 ++---
be/src/exec/olap_scan_node.cpp | 3 +++
be/src/exprs/bloomfilter_predicate.h | 37 ++++++++++++++++++++++--------
be/src/exprs/runtime_filter.cpp | 29 ++++++++----------------
be/src/exprs/runtime_filter.h | 2 +-
be/src/runtime/fragment_mgr.cpp | 39 --------------------------------
be/src/runtime/runtime_filter_mgr.cpp | 4 ++++
be/src/vec/exec/join/vhash_join_node.cpp | 22 ++++++++++++------
be/src/vec/exec/join/vhash_join_node.h | 5 +++-
be/src/vec/exec/scan/vscan_node.cpp | 3 +++
be/src/vec/exec/volap_scan_node.cpp | 3 +++
be/test/exprs/runtime_filter_test.cpp | 6 +++++
13 files changed, 96 insertions(+), 99 deletions(-)
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 53efed1113..7e9a2bf989 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -24,6 +24,7 @@
#include "common/utils.h"
#include "exec/hash_table.h"
+#include "exprs/bloomfilter_predicate.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/runtime_filter.h"
@@ -88,9 +89,13 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
_build_unique = false;
}
- for (const auto& filter_desc : _runtime_filter_descs) {
+ _runtime_filters.resize(_runtime_filter_descs.size());
+
+ for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
- RuntimeFilterRole::PRODUCER, filter_desc, state->query_options()));
+ RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
+ _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}
return Status::OK();
@@ -177,10 +182,10 @@ Status HashJoinNode::close(RuntimeState* state) {
return ExecNode::close(state);
}
-void HashJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) {
+void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- status->set_value(construct_hash_table(state));
+ status->set_value(child(0)->open(state));
}
Status HashJoinNode::construct_hash_table(RuntimeState* state) {
@@ -218,6 +223,11 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
}
Status HashJoinNode::open(RuntimeState* state) {
+ for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
+ if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
+ }
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
@@ -235,13 +245,13 @@ Status HashJoinNode::open(RuntimeState* state) {
// main thread
std::promise<Status> thread_status;
add_runtime_exec_option("Hash Table Built Asynchronously");
- std::thread(bind(&HashJoinNode::build_side_thread, this, state, &thread_status)).detach();
+ std::thread(bind(&HashJoinNode::probe_side_open_thread, this, state, &thread_status)).detach();
if (!_runtime_filter_descs.empty()) {
RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
_runtime_filter_descs);
- RETURN_IF_ERROR(thread_status.get_future().get());
+ RETURN_IF_ERROR(construct_hash_table(state));
RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size()));
{
SCOPED_TIMER(_push_compute_timer);
@@ -253,23 +263,13 @@ Status HashJoinNode::open(RuntimeState* state) {
SCOPED_TIMER(_push_down_timer);
runtime_filter_slots.publish();
}
- Status open_status = child(0)->open(state);
- RETURN_IF_ERROR(open_status);
+ RETURN_IF_ERROR(thread_status.get_future().get());
} else {
- // Open the probe-side child so that it may perform any initialisation in parallel.
- // Don't exit even if we see an error, we still need to wait for the build thread
- // to finish.
- Status open_status = child(0)->open(state);
-
// Blocks until ConstructHashTable has returned, after which
// the hash table is fully constructed and we can start the probe
// phase.
RETURN_IF_ERROR(thread_status.get_future().get());
-
- // ISSUE-1247, check open_status after buildThread execute.
- // If this return first, build thread will use 'thread_status'
- // which is already destructor and then coredump.
- RETURN_IF_ERROR(open_status);
+ RETURN_IF_ERROR(construct_hash_table(state));
}
// seed probe batch and _current_probe_row, etc.
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index 506ac07b9e..afb898b991 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -34,6 +34,7 @@ namespace doris {
class MemPool;
class RowBatch;
class TupleRow;
+class IRuntimeFilter;
// Node for in-memory hash joins:
// - builds up a hash table with the rows produced by our right input
@@ -139,9 +140,7 @@ private:
RuntimeProfile::Counter* _hash_table_list_min_size;
RuntimeProfile::Counter* _hash_table_list_max_size;
- // Supervises ConstructHashTable in a separate thread, and
- // returns its status in the promise parameter.
- void build_side_thread(RuntimeState* state, std::promise<Status>* status);
+ void probe_side_open_thread(RuntimeState* state, std::promise<Status>* status);
// We parallelise building the build-side with Open'ing the
// probe-side. If, for example, the probe-side child is another
@@ -177,6 +176,7 @@ private:
std::string get_probe_row_output_string(TupleRow* probe_row);
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+ std::vector<IRuntimeFilter*> _runtime_filters;
};
} // namespace doris
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index d3b3a3aabd..ef4f2236c5 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -244,6 +244,9 @@ Status OlapScanNode::open(RuntimeState* state) {
IRuntimeFilter* runtime_filter = nullptr;
state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
DCHECK(runtime_filter != nullptr);
+ if (auto bf = runtime_filter->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
if (runtime_filter == nullptr) {
continue;
}
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index cd1e89f465..ab07fd8e1b 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -20,6 +20,7 @@
#include <algorithm>
#include <cmath>
#include <cstdint>
+#include <future>
#include <memory>
#include <string>
#include <type_traits>
@@ -100,6 +101,10 @@ public:
return init_with_fixed_length(filter_size);
}
+ void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; }
+
+ Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); }
+
Status init_with_fixed_length(int64_t bloom_filter_length) {
if (_inited) {
return Status::OK();
@@ -118,17 +123,28 @@ public:
}
Status merge(BloomFilterFuncBase* bloomfilter_func) {
- auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
- if (bloomfilter_func == nullptr) {
- _bloom_filter.reset(BloomFilterAdaptor::create());
- }
- if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
- LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
- << _bloom_filter_alloced
- << ", expected allocated bytes = " << other_func->_bloom_filter_alloced;
- return Status::InvalidArgument("bloom filter size invalid");
+ // If `_inited` is false, there is no memory allocated in bloom filter and this is the first
+ // call for `merge` function. So we just reuse this bloom filter, and we don't need to
+ // allocate memory again.
+ if (!_inited) {
+ auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ DCHECK(_bloom_filter == nullptr);
+ DCHECK(bloomfilter_func != nullptr);
+ _bloom_filter = bloomfilter_func->_bloom_filter;
+ _bloom_filter_alloced = other_func->_bloom_filter_alloced;
+ _inited = true;
+ return Status::OK();
+ } else {
+ DCHECK(bloomfilter_func != nullptr);
+ auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
+ LOG(WARNING) << "bloom filter size not the same: already allocated bytes = "
+ << _bloom_filter_alloced << ", expected allocated bytes = "
+ << other_func->_bloom_filter_alloced;
+ return Status::InvalidArgument("bloom filter size invalid");
+ }
+ return _bloom_filter->merge(other_func->_bloom_filter.get());
}
- return _bloom_filter->merge(other_func->_bloom_filter.get());
}
Status assign(const char* data, int len) {
@@ -175,6 +191,7 @@ protected:
std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
bool _inited;
std::mutex _lock;
+ int64_t _bloom_filter_length;
};
template <class T>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 540e814633..1b5ca935e5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -418,7 +418,7 @@ public:
_filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
- Status init(const RuntimeFilterParams* params, bool init_bloom_filter) {
+ Status init(const RuntimeFilterParams* params) {
_max_in_num = params->max_in_num;
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
@@ -432,20 +432,14 @@ public:
case RuntimeFilterType::BLOOM_FILTER: {
_is_bloomfilter = true;
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- if (init_bloom_filter) {
- return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
- } else {
- return Status::OK();
- }
+ _bloomfilter_func->set_length(params->bloom_filter_size);
+ return Status::OK();
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
_hybrid_set.reset(create_set(_column_return_type));
_bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- if (init_bloom_filter) {
- return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
- } else {
- return Status::OK();
- }
+ _bloomfilter_func->set_length(params->bloom_filter_size);
+ return Status::OK();
}
default:
return Status::InvalidArgument("Unknown Filter type");
@@ -469,11 +463,7 @@ public:
}
}
- BloomFilterFuncBase* get_bloomfilter() const {
- DCHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
- _filter_type == RuntimeFilterType::BLOOM_FILTER);
- return _bloomfilter_func.get();
- }
+ BloomFilterFuncBase* get_bloomfilter() const { return _bloomfilter_func.get(); }
void insert(const void* data) {
switch (_filter_type) {
@@ -1073,7 +1063,7 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt
*res = pool->add(new IRuntimeFilter(state, pool));
(*res)->set_role(role);
UniqueId fragment_instance_id(state->fragment_instance_id());
- return (*res)->init_with_desc(desc, query_options, fragment_instance_id, true, node_id);
+ return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
}
void IRuntimeFilter::insert(const void* data) {
@@ -1205,8 +1195,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
}
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
- UniqueId fragment_instance_id, bool init_bloom_filter,
- int node_id) {
+ UniqueId fragment_instance_id, int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
@@ -1254,7 +1243,7 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
- return _wrapper->init(¶ms, init_bloom_filter);
+ return _wrapper->init(¶ms);
}
Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 6756e2e70f..677321ea4d 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -191,7 +191,7 @@ public:
// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
- UniqueId fragment_id, bool init_bloom_filter = false, int node_id = -1);
+ UniqueId fragment_id, int node_id = -1);
BloomFilterFuncBase* get_bloomfilter() const;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b18e80df3e..7b5c3af432 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -27,7 +27,6 @@
#include "common/object_pool.h"
#include "common/resource_tls.h"
#include "common/signal_handler.h"
-#include "exprs/bloomfilter_predicate.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService.h"
@@ -426,7 +425,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env),
_fragment_map(),
_fragments_ctx_map(),
- _bf_size_map(),
_stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
@@ -469,7 +467,6 @@ FragmentMgr::~FragmentMgr() {
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_fragments_ctx_map.clear();
- _bf_size_map.clear();
}
}
@@ -511,7 +508,6 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
_fragment_map.erase(exec_state->fragment_instance_id());
if (all_done && fragments_ctx) {
_fragments_ctx_map.erase(fragments_ctx->query_id);
- _bf_size_map.erase(fragments_ctx->query_id);
}
}
@@ -673,29 +669,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
- auto& runtime_filter_params = params.params.runtime_filter_params;
- if (!runtime_filter_params.rid_to_runtime_filter.empty()) {
- auto bf_size_for_cur_query = _bf_size_map.find(fragments_ctx->query_id);
- if (bf_size_for_cur_query == _bf_size_map.end()) {
- _bf_size_map.insert({fragments_ctx->query_id, {}});
- }
- for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
- int filter_id = filterid_to_desc.first;
- const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
- if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
- continue;
- }
- const auto& build_iter =
- runtime_filter_params.runtime_filter_builder_num.find(filter_id);
- if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
- continue;
- }
- if (filterid_to_desc.second.__isset.bloom_filter_size_bytes) {
- _bf_size_map[fragments_ctx->query_id].insert(
- {filter_id, filterid_to_desc.second.bloom_filter_size_bytes});
- }
- }
- }
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
_cv.notify_all();
}
@@ -710,7 +683,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
- _bf_size_map.erase(fragments_ctx->query_id);
}
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
@@ -982,17 +954,6 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char*
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto bf_size_for_cur_query = _bf_size_map.find(queryid.to_thrift());
- if (bf_size_for_cur_query != _bf_size_map.end()) {
- for (auto& iter : bf_size_for_cur_query->second) {
- auto bf = filter_controller->get_filter(iter.first)->filter->get_bloomfilter();
- DCHECK(bf != nullptr);
- bf->init_with_fixed_length(iter.second);
- }
- }
- }
RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
return Status::OK();
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 73a9115a23..5175ed580c 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -20,6 +20,7 @@
#include <string>
#include "client_cache.h"
+#include "exprs/bloomfilter_predicate.h"
#include "exprs/runtime_filter.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/exec_env.h"
@@ -197,6 +198,9 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
return Status::InvalidArgument("unknown filter id");
}
cntVal = iter->second;
+ if (auto bf = cntVal->filter->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
MergeRuntimeFilterParams params;
params.data = data;
params.request = request;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index fad0eb68cf..760db91927 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -898,9 +898,12 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
_output_expr_ctxs.push_back(ctx);
}
- for (const auto& filter_desc : _runtime_filter_descs) {
+ _runtime_filters.resize(_runtime_filter_descs.size());
+ for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
- RuntimeFilterRole::PRODUCER, filter_desc, state->query_options()));
+ RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
+ _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}
// init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need
@@ -1198,6 +1201,11 @@ void HashJoinNode::_construct_mutable_join_block() {
Status HashJoinNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
+ if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
+ }
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
@@ -1213,7 +1221,7 @@ Status HashJoinNode::open(RuntimeState* state) {
std::thread([this, state, thread_status_p = &thread_status,
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
OpentelemetryScope scope {parent_span};
- this->_hash_table_build_thread(state, thread_status_p);
+ this->_probe_side_open_thread(state, thread_status_p);
}).detach();
// Open the probe-side child so that it may perform any initialisation in parallel.
@@ -1222,16 +1230,16 @@ Status HashJoinNode::open(RuntimeState* state) {
// ISSUE-1247, check open_status after buildThread execute.
// If this return first, build thread will use 'thread_status'
// which is already destructor and then coredump.
- Status open_status = child(0)->open(state);
+ Status status = _hash_table_build(state);
RETURN_IF_ERROR(thread_status.get_future().get());
- return open_status;
+ return status;
}
-void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<Status>* status) {
+void HashJoinNode::_probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- status->set_value(_hash_table_build(state));
+ status->set_value(child(0)->open(state));
}
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 5de6b0be9c..5f84211110 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -389,7 +389,8 @@ private:
MutableColumnPtr _tuple_is_null_left_flag_column;
MutableColumnPtr _tuple_is_null_right_flag_column;
- void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status);
+private:
+ void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* status);
Status _hash_table_build(RuntimeState* state);
@@ -435,6 +436,8 @@ private:
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+
+ std::vector<IRuntimeFilter*> _runtime_filters;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 81a1c80d54..af193ddcf3 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -202,6 +202,9 @@ Status VScanNode::_acquire_runtime_filter() {
std::vector<VExpr*> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+ if (auto bf = runtime_filter->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
bool ready = runtime_filter->is_ready();
if (!ready) {
ready = runtime_filter->await();
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 8551a872da..f2f276cc70 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -253,6 +253,9 @@ Status VOlapScanNode::open(RuntimeState* state) {
std::vector<VExpr*> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
+ if (auto bf = runtime_filter->get_bloomfilter()) {
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
+ }
bool ready = runtime_filter->is_ready();
if (!ready) {
ready = runtime_filter->await();
diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp
index 6e69eb1d37..d4b66f295c 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -20,6 +20,7 @@
#include <array>
#include <memory>
+#include "exprs/bloomfilter_predicate.h"
#include "exprs/expr_context.h"
#include "exprs/slot_ref.h"
#include "gen_cpp/Planner_types.h"
@@ -109,6 +110,11 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio
EXPECT_TRUE(status.ok()) << status.to_string();
+ if (auto bf = runtime_filter->get_bloomfilter()) {
+ status = bf->init_with_fixed_length();
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ }
+
return status.ok() ? runtime_filter : nullptr;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org