You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by en...@apache.org on 2023/06/16 12:26:09 UTC
[doris] branch master updated: [feature](nereids) Support multi target rf #20714
This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab32299ba4 [feature](nereids) Support multi target rf #20714
ab32299ba4 is described below
commit ab32299ba40bef55d4ba626a43df8b313c7718dc
Author: xzj7019 <13...@users.noreply.github.com>
AuthorDate: Fri Jun 16 20:26:00 2023 +0800
[feature](nereids) Support multi target rf #20714
Support multi target runtime filter, mainly for set operation, such as union/intersect/except.
---
be/src/exprs/runtime_filter.cpp | 19 ++-
be/src/exprs/runtime_filter.h | 2 +
be/src/exprs/runtime_filter_slots.h | 12 +-
be/src/runtime/fragment_mgr.cpp | 16 +-
be/src/runtime/runtime_filter_mgr.cpp | 149 ++++++++++++-------
be/src/runtime/runtime_filter_mgr.h | 29 ++--
be/src/vec/columns/column_nullable.h | 6 +-
be/src/vec/exec/join/vhash_join_node.cpp | 5 +-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 4 +-
be/src/vec/exec/runtime_filter_consumer_node.cpp | 164 +++++++++++++++++++++
be/src/vec/exec/runtime_filter_consumer_node.h | 73 +++++++++
be/src/vec/exec/scan/vscan_node.cpp | 138 +----------------
be/src/vec/exec/scan/vscan_node.h | 37 +----
be/src/vec/exec/vdata_gen_scan_node.cpp | 12 +-
be/src/vec/exec/vselect_node.cpp | 20 ++-
be/src/vec/exec/vselect_node.h | 7 +-
.../glue/translator/PhysicalPlanTranslator.java | 2 +-
.../glue/translator/RuntimeFilterTranslator.java | 101 +++++++------
.../processor/post/RuntimeFilterContext.java | 10 +-
.../processor/post/RuntimeFilterGenerator.java | 108 ++++++++++----
.../trees/plans/physical/RuntimeFilter.java | 28 ++--
.../org/apache/doris/planner/RuntimeFilter.java | 26 ++--
.../doris/planner/RuntimeFilterGenerator.java | 41 ++++--
.../nereids/postprocess/RuntimeFilterTest.java | 2 +-
24 files changed, 631 insertions(+), 380 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c11ab8715c..2c6f3662bb 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1105,6 +1105,12 @@ Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterC
return Status::OK();
}
+void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
+ _wrapper->_filter_type = other->_wrapper->_filter_type;
+ _wrapper->_is_bloomfilter = other->is_bloomfilter();
+ _wrapper->_context = other->_wrapper->_context;
+}
+
void IRuntimeFilter::insert(const void* data) {
DCHECK(is_producer());
if (!_is_ignored) {
@@ -1126,13 +1132,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column,
Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
- IRuntimeFilter* consumer_filter = nullptr;
- RETURN_IF_ERROR(
- _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter));
+ std::vector<IRuntimeFilter*> filters;
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id, filters));
// push down
- consumer_filter->_wrapper = _wrapper;
- consumer_filter->update_runtime_filter_type_to_profile();
- consumer_filter->signal();
+ for (auto filter : filters) {
+ filter->_wrapper = _wrapper;
+ filter->update_runtime_filter_type_to_profile();
+ filter->signal();
+ }
return Status::OK();
} else {
TNetworkAddress addr;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 7042dccc1c..a4fd241a28 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -207,6 +207,8 @@ public:
void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
+ void copy_from_other(IRuntimeFilter* other);
+
// insert data to build filter
// only used for producer
void insert(const void* data);
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
index 08532251bc..816e62c731 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -51,11 +51,13 @@ public:
std::map<int, bool> has_in_filter;
auto ignore_local_filter = [state](int filter_id) {
- IRuntimeFilter* consumer_filter = nullptr;
- state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
- DCHECK(consumer_filter != nullptr);
- consumer_filter->set_ignored();
- consumer_filter->signal();
+ std::vector<IRuntimeFilter*> filters;
+ state->runtime_filter_mgr()->get_consume_filters(filter_id, filters);
+ DCHECK(!filters.empty());
+ for (auto filter : filters) {
+ filter->set_ignored();
+ filter->signal();
+ }
};
auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c98bebafa1..4581768199 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1276,9 +1276,19 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
int filter_id = request->filter_id();
- IRuntimeFilter* real_filter = nullptr;
- RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter));
- RETURN_IF_ERROR(real_filter->update_filter(¶ms, start_apply));
+ std::vector<IRuntimeFilter*> filters;
+ RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters));
+
+ IRuntimeFilter* first_filter = nullptr;
+ for (auto filter : filters) {
+ if (!first_filter) {
+ RETURN_IF_ERROR(filter->update_filter(¶ms, start_apply));
+ first_filter = filter;
+ } else {
+ filter->copy_from_other(first_filter);
+ filter->signal();
+ }
+ }
}
return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index e00149a7fa..e2b4c525ac 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -55,91 +55,124 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
: _query_ctx(query_ctx) {}
-RuntimeFilterMgr::~RuntimeFilterMgr() = default;
-
Status RuntimeFilterMgr::init() {
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
ExecEnv::GetInstance()->experimental_mem_tracker());
return Status::OK();
}
-Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
- IRuntimeFilter** target) {
+Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) {
int32_t key = filter_id;
- std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
- if (role == RuntimeFilterRole::CONSUMER) {
- filter_map = &_consumer_map;
- } else {
- filter_map = &_producer_map;
+ auto iter = _producer_map.find(key);
+ if (iter == _producer_map.end()) {
+ LOG(WARNING) << "unknown runtime filter: " << key << ", role: PRODUCER";
+ return Status::InvalidArgument("unknown filter");
}
- auto iter = filter_map->find(key);
- if (iter == filter_map->end()) {
- return Status::InternalError("get filter failed, key={}, role={}", key, (int)role);
- }
- *target = iter->second.filter;
+ *target = iter->second;
return Status::OK();
}
-Status RuntimeFilterMgr::get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter) {
- return get_filter_by_role(filter_id, RuntimeFilterRole::CONSUMER, consumer_filter);
+Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_id,
+ IRuntimeFilter** consumer_filter) {
+ auto iter = _consumer_map.find(filter_id);
+ if (iter == _consumer_map.cend()) {
+ LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role: consumer";
+ return Status::InvalidArgument("unknown filter");
+ }
+
+ for (auto& item : iter->second) {
+ if (item.node_id == node_id) {
+ *consumer_filter = item.filter;
+ return Status::OK();
+ }
+ }
+
+ return Status::InvalidArgument(
+ fmt::format("unknown filter, filter_id: {}, node_id: {}", filter_id, node_id));
}
-Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
- IRuntimeFilter** producer_filter) {
- return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter);
+Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
+ std::vector<IRuntimeFilter*>& consumer_filters) {
+ int32_t key = filter_id;
+ auto iter = _consumer_map.find(key);
+ if (iter == _consumer_map.end()) {
+ LOG(WARNING) << "unknown runtime filter: " << key << ", role: consumer";
+ return Status::InvalidArgument("unknown filter");
+ }
+ for (auto& holder : iter->second) {
+ consumer_filters.emplace_back(holder.filter);
+ }
+ return Status::OK();
}
-Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
- const TRuntimeFilterDesc& desc,
- const TQueryOptions& options, int node_id,
- bool build_bf_exactly) {
- DCHECK(role != RuntimeFilterRole::CONSUMER || node_id >= 0);
+Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc,
+ const TQueryOptions& options, int node_id,
+ bool build_bf_exactly) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
- std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
- if (role == RuntimeFilterRole::CONSUMER) {
- filter_map = &_consumer_map;
- } else {
- filter_map = &_producer_map;
- }
- VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role;
-
- auto iter = filter_map->find(key);
-
- RuntimeFilterMgrVal filter_mgr_val;
- filter_mgr_val.role = role;
-
- if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == RuntimeFilterRole::CONSUMER &&
- desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) {
+ auto iter = _consumer_map.find(key);
+ if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets &&
+ desc.type == TRuntimeFilterType::BLOOM) {
// if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
DCHECK(_query_ctx != nullptr);
- if (iter != filter_map->end()) {
- return Status::OK();
- }
+
{
std::lock_guard<std::mutex> l(_lock);
- iter = filter_map->find(key);
- if (iter != filter_map->end()) {
- return Status::OK();
+
+ iter = _consumer_map.find(key);
+ if (iter != _consumer_map.end()) {
+ for (auto holder : iter->second) {
+ if (holder.node_id == node_id) {
+ return Status::OK();
+ }
+ }
}
+ IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc,
- &options, role, node_id, &filter_mgr_val.filter,
- build_bf_exactly));
- filter_map->emplace(key, filter_mgr_val);
+
+ &options, RuntimeFilterRole::CONSUMER, node_id,
+ &filter, build_bf_exactly));
+ _consumer_map[key].emplace_back(node_id, filter);
}
} else {
DCHECK(_state != nullptr);
- if (iter != filter_map->end()) {
- return Status::InvalidArgument("filter has registed");
+
+ if (iter != _consumer_map.end()) {
+ for (auto holder : iter->second) {
+ if (holder.node_id == node_id) {
+ return Status::InvalidArgument("filter has registered");
+ }
+ }
}
- RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id,
- &filter_mgr_val.filter, build_bf_exactly));
- filter_map->emplace(key, filter_mgr_val);
+
+ IRuntimeFilter* filter;
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::CONSUMER, node_id, &filter,
+ build_bf_exactly));
+ _consumer_map[key].emplace_back(node_id, filter);
}
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc,
+ const TQueryOptions& options,
+ bool build_bf_exactly) {
+ SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+ int32_t key = desc.filter_id;
+ auto iter = _producer_map.find(key);
+ DCHECK(_state != nullptr);
+ if (iter != _producer_map.end()) {
+ return Status::InvalidArgument("filter has registed");
+ }
+ IRuntimeFilter* filter;
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::PRODUCER, -1, &filter,
+ build_bf_exactly));
+ _producer_map.emplace(key, filter);
return Status::OK();
}
@@ -148,9 +181,13 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
UpdateRuntimeFilterParams params(request, data, &_pool);
int filter_id = request->filter_id();
- IRuntimeFilter* real_filter = nullptr;
- RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
- return real_filter->update_filter(¶ms);
+ std::vector<IRuntimeFilter*> filters;
+ RETURN_IF_ERROR(get_consume_filters(filter_id, filters));
+ for (auto filter : filters) {
+ RETURN_IF_ERROR(filter->update_filter(¶ms));
+ }
+
+ return Status::OK();
}
void RuntimeFilterMgr::set_runtime_filter_params(
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 0a0a70ec7a..f3cb32813f 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -68,18 +68,22 @@ public:
RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
- ~RuntimeFilterMgr();
+ ~RuntimeFilterMgr() = default;
Status init();
- // get a consumer filter by filter-id
- Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
+ Status get_consume_filter(const int filter_id, const int node_id,
+ IRuntimeFilter** consumer_filter);
+
+ Status get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters);
Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
- // regist filter
- Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
- const TQueryOptions& options, int node_id = -1,
- bool build_bf_exactly = false);
+
+ // register filter
+ Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+ int node_id, bool build_bf_exactly = false);
+ Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+ bool build_bf_exactly = false);
// update filter by remote
Status update_filter(const PPublishFilterRequest* request,
@@ -90,19 +94,16 @@ public:
Status get_merge_addr(TNetworkAddress* addr);
private:
- Status get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
- IRuntimeFilter** target);
-
- struct RuntimeFilterMgrVal {
- RuntimeFilterRole role; // consumer or producer
+ struct ConsumerFilterHolder {
+ int node_id;
IRuntimeFilter* filter;
};
// RuntimeFilterMgr is owned by RuntimeState, so we only
// use filter_id as key
// key: "filter-id"
/// TODO: should it need protected by a mutex?
- std::map<int32_t, RuntimeFilterMgrVal> _consumer_map;
- std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
+ std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
+ std::map<int32_t, IRuntimeFilter*> _producer_map;
RuntimeState* _state;
QueryContext* _query_ctx;
diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h
index 7d4001f6fe..d5ca7f844b 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -98,7 +98,11 @@ public:
const char* get_family_name() const override { return "Nullable"; }
std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
- size_t size() const override { return nested_column->size(); }
+ size_t size() const override {
+ return nested_column->size(
+
+ );
+ }
bool is_null_at(size_t n) const override {
return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 0770d26e74..2a239c48b0 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -392,9 +392,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1,
- _probe_expr_ctxs.size() == 1));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+ _runtime_filter_descs[i], state->query_options(), _probe_expr_ctxs.size() == 1));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 2a603e7680..0b50860874 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -124,8 +124,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+ _runtime_filter_descs[i], state->query_options()));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs));
return Status::OK();
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer_node.cpp
new file mode 100644
index 0000000000..dd631ce66e
--- /dev/null
+++ b/be/src/vec/exec/runtime_filter_consumer_node.cpp
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/runtime_filter_consumer_node.h"
+
+namespace doris::vectorized {
+
+RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode,
+ const DescriptorTbl& descs)
+ : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
+
+Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(ExecNode::init(tnode, state));
+ _state = state;
+ RETURN_IF_ERROR(_register_runtime_filter());
+ return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::_register_runtime_filter() {
+ int filter_size = _runtime_filter_descs.size();
+ _runtime_filter_ctxs.reserve(filter_size);
+ _runtime_filter_ready_flag.reserve(filter_size);
+ for (int i = 0; i < filter_size; ++i) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ const auto& filter_desc = _runtime_filter_descs[i];
+ if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+ DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
+ // Optimize merging phase iff:
+ // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
+ // 2. This filter is bloom filter (only bloom filter should be used for merging)
+ RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, _state->query_options(), id(), false));
+ RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+ filter_desc.filter_id, id(), &runtime_filter));
+ } else {
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, _state->query_options(), id(), false));
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
+ filter_desc.filter_id, id(), &runtime_filter));
+ }
+ _runtime_filter_ctxs.emplace_back(runtime_filter);
+ _runtime_filter_ready_flag.emplace_back(false);
+ }
+ return Status::OK();
+}
+
+bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
+ if (!_blocked_by_rf) {
+ return true;
+ }
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+ if (!runtime_filter->is_ready_or_timeout()) {
+ return false;
+ }
+ }
+ _blocked_by_rf = false;
+ return true;
+}
+
+Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
+ SCOPED_TIMER(_acquire_runtime_filter_timer);
+ VExprSPtrs vexprs;
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+ bool ready = runtime_filter->is_ready();
+ if (!ready && wait) {
+ ready = runtime_filter->await();
+ }
+ if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+ RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
+ _runtime_filter_ctxs[i].apply_mark = true;
+ } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
+ runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
+ !_runtime_filter_ctxs[i].apply_mark) {
+ _blocked_by_rf = true;
+ } else if (!_runtime_filter_ctxs[i].apply_mark) {
+ DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
+ _is_all_rf_applied = false;
+ }
+ }
+ RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+ if (_blocked_by_rf) {
+ return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
+ }
+
+ return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
+ if (vexprs.empty()) {
+ return Status::OK();
+ }
+
+ for (auto& expr : vexprs) {
+ VExprContextSPtr conjunct = VExprContext::create_shared(expr);
+ RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
+ RETURN_IF_ERROR(conjunct->open(_state));
+ _rf_vexpr_set.insert(expr);
+ _conjuncts.emplace_back(conjunct);
+ }
+
+ return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
+ if (_is_all_rf_applied) {
+ *arrived_rf_num = _runtime_filter_descs.size();
+ return Status::OK();
+ }
+
+ // This method will be called in scanner thread.
+ // So need to add lock
+ std::unique_lock l(_rf_locks);
+ if (_is_all_rf_applied) {
+ *arrived_rf_num = _runtime_filter_descs.size();
+ return Status::OK();
+ }
+
+ // 1. Check if are runtime filter ready but not applied.
+ VExprSPtrs exprs;
+ int current_arrived_rf_num = 0;
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ if (_runtime_filter_ctxs[i].apply_mark) {
+ ++current_arrived_rf_num;
+ continue;
+ } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
+ RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
+ &exprs, _row_descriptor, _state));
+ ++current_arrived_rf_num;
+ _runtime_filter_ctxs[i].apply_mark = true;
+ }
+ }
+ // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+ if (!exprs.empty()) {
+ RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
+ }
+ if (current_arrived_rf_num == _runtime_filter_descs.size()) {
+ _is_all_rf_applied = true;
+ }
+
+ *arrived_rf_num = current_arrived_rf_num;
+ return Status::OK();
+}
+
+void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
+ _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime");
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer_node.h
new file mode 100644
index 0000000000..518e0e865c
--- /dev/null
+++ b/be/src/vec/exec/runtime_filter_consumer_node.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/exec_node.h"
+#include "exprs/runtime_filter.h"
+
+namespace doris::vectorized {
+
+class RuntimeFilterConsumerNode : public ExecNode {
+public:
+ RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+ ~RuntimeFilterConsumerNode() override = default;
+
+ Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+
+ // Try append late arrived runtime filters.
+ // Return num of filters which are applied already.
+ Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
+
+ bool runtime_filters_are_ready_or_timeout();
+
+protected:
+ // Register and get all runtime filters at Init phase.
+ Status _register_runtime_filter();
+ // Get all arrived runtime filters at Open phase.
+ Status _acquire_runtime_filter(bool wait = true);
+ // Append late-arrival runtime filters to the vconjunct_ctx.
+ Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
+
+ void _prepare_rf_timer(RuntimeProfile* profile);
+
+ // For runtime filters
+ struct RuntimeFilterContext {
+ RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
+ RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
+ // set to true if this runtime filter is already applied to vconjunct_ctx_ptr
+ bool apply_mark;
+ IRuntimeFilter* runtime_filter;
+ };
+
+ RuntimeState* _state;
+
+ std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+ // Set to true if the runtime filter is ready.
+ std::vector<bool> _runtime_filter_ready_flag;
+ doris::Mutex _rf_locks;
+ phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+ // True means all runtime filters are applied to scanners
+ bool _is_all_rf_applied = true;
+ bool _blocked_by_rf = false;
+
+ RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 4c55150e53..852998c25c 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -94,8 +94,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
}
Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::init(tnode, state));
- _state = state;
+ RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
_is_pipeline_scan = state->enable_pipeline_exec();
const TQueryOptions& query_options = state->query_options();
@@ -109,9 +108,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
} else {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}
-
- RETURN_IF_ERROR(_register_runtime_filter());
-
return Status::OK();
}
@@ -149,7 +145,8 @@ Status VScanNode::prepare(RuntimeState* state) {
// if you want to add some profile in scan node, even it have not new VScanner object
// could add here, not in the _init_profile() function
_get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime");
- _acquire_runtime_filter_timer = ADD_TIMER(_runtime_profile, "AcuireRuntimeFilterTime");
+
+ _prepare_rf_timer(_runtime_profile.get());
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime");
@@ -311,95 +308,6 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners) {
return Status::OK();
}
-Status VScanNode::_register_runtime_filter() {
- int filter_size = _runtime_filter_descs.size();
- _runtime_filter_ctxs.reserve(filter_size);
- _runtime_filter_ready_flag.reserve(filter_size);
- for (int i = 0; i < filter_size; ++i) {
- IRuntimeFilter* runtime_filter = nullptr;
- const auto& filter_desc = _runtime_filter_descs[i];
- if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
- DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
- // Optimize merging phase iff:
- // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
- // 2. This filter is bloom filter (only bloom filter should be used for merging)
- RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
- false));
- RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
- filter_desc.filter_id, &runtime_filter));
- } else {
- RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
- false));
- RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
- &runtime_filter));
- }
- _runtime_filter_ctxs.emplace_back(runtime_filter);
- _runtime_filter_ready_flag.emplace_back(false);
- }
- return Status::OK();
-}
-
-bool VScanNode::runtime_filters_are_ready_or_timeout() {
- if (!_blocked_by_rf) {
- return true;
- }
- for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
- IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- if (!runtime_filter->is_ready_or_timeout()) {
- return false;
- }
- }
- _blocked_by_rf = false;
- return true;
-}
-
-Status VScanNode::_acquire_runtime_filter(bool wait) {
- SCOPED_TIMER(_acquire_runtime_filter_timer);
- VExprSPtrs vexprs;
- for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
- IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready && wait) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
- runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- _blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
- }
- }
- RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
- if (_blocked_by_rf) {
- return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
- }
-
- return Status::OK();
-}
-
-Status VScanNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
- if (vexprs.empty()) {
- return Status::OK();
- }
-
- for (auto& expr : vexprs) {
- VExprContextSPtr conjunct = VExprContext::create_shared(expr);
- RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
- RETURN_IF_ERROR(conjunct->open(_state));
- _rf_vexpr_set.insert(expr);
- _conjuncts.emplace_back(conjunct);
- }
-
- return Status::OK();
-}
-
Status VScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
@@ -1280,46 +1188,6 @@ Status VScanNode::_change_value_range(ColumnValueRange<PrimitiveType>& temp_rang
return Status::OK();
}
-Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
- if (_is_all_rf_applied) {
- *arrived_rf_num = _runtime_filter_descs.size();
- return Status::OK();
- }
-
- // This method will be called in scanner thread.
- // So need to add lock
- std::unique_lock l(_rf_locks);
- if (_is_all_rf_applied) {
- *arrived_rf_num = _runtime_filter_descs.size();
- return Status::OK();
- }
-
- // 1. Check if are runtime filter ready but not applied.
- VExprSPtrs exprs;
- int current_arrived_rf_num = 0;
- for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
- if (_runtime_filter_ctxs[i].apply_mark) {
- ++current_arrived_rf_num;
- continue;
- } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
- RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
- &exprs, _row_descriptor, _state));
- ++current_arrived_rf_num;
- _runtime_filter_ctxs[i].apply_mark = true;
- }
- }
- // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
- if (!exprs.empty()) {
- RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
- }
- if (current_arrived_rf_num == _runtime_filter_descs.size()) {
- _is_all_rf_applied = true;
- }
-
- *arrived_rf_num = current_arrived_rf_num;
- return Status::OK();
-}
-
Status VScanNode::clone_conjunct_ctxs(VExprContextSPtrs& conjuncts) {
if (!_conjuncts.empty()) {
std::unique_lock l(_rf_locks);
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 8d235a8365..bb1089d349 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -43,6 +43,7 @@
#include "runtime/runtime_state.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
+#include "vec/exec/runtime_filter_consumer_node.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/runtime/shared_scanner_controller.h"
@@ -87,10 +88,10 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters;
};
-class VScanNode : public ExecNode {
+class VScanNode : public RuntimeFilterConsumerNode {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {
+ : RuntimeFilterConsumerNode(pool, tnode, descs) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 && tnode.limit < 1024) {
@@ -140,10 +141,6 @@ public:
void set_no_agg_finalize() { _need_agg_finalize = false; }
- // Try append late arrived runtime filters.
- // Return num of filters which are applied already.
- Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
-
// Clone current _conjuncts to conjuncts, if exists.
Status clone_conjunct_ctxs(VExprContextSPtrs& conjuncts);
@@ -156,7 +153,6 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
- bool runtime_filters_are_ready_or_timeout();
Status try_close();
@@ -242,7 +238,6 @@ protected:
Status _prepare_scanners();
- RuntimeState* _state;
bool _is_pipeline_scan = false;
bool _shared_scan_opt = false;
// For load scan node, there should be both input and output tuple descriptor.
@@ -256,24 +251,6 @@ protected:
int _max_scan_key_num;
int _max_pushdown_conditions_per_column;
- // For runtime filters
- struct RuntimeFilterContext {
- RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
- RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
- // set to true if this runtime filter is already applied to vconjunct_ctx_ptr
- bool apply_mark;
- IRuntimeFilter* runtime_filter;
- };
- std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
- // Set to true if the runtime filter is ready.
- std::vector<bool> _runtime_filter_ready_flag;
- doris::Mutex _rf_locks;
- phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
- // True means all runtime filters are applied to scanners
- bool _is_all_rf_applied = true;
-
// Each scan node will generates a ScannerContext to manage all Scanners.
// See comments of ScannerContext for more details
std::shared_ptr<ScannerContext> _scanner_ctx;
@@ -315,7 +292,6 @@ protected:
std::vector<ColumnValueRangeType> _not_in_value_ranges;
bool _need_agg_finalize = true;
- bool _blocked_by_rf = false;
// If the query like select * from table limit 10; then the query should run in
// single scanner to avoid too many scanners which will cause lots of useless read.
bool _should_run_serial = false;
@@ -371,13 +347,6 @@ protected:
std::vector<int> _col_distribute_ids;
private:
- // Register and get all runtime filters at Init phase.
- Status _register_runtime_filter();
- // Get all arrived runtime filters at Open phase.
- Status _acquire_runtime_filter(bool wait = true);
- // Append late-arrival runtime filters to the vconjunct_ctx.
- Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
-
Status _normalize_conjuncts();
Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
VExprSPtr& output_expr);
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 35494e93d0..95ce6cbc8a 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -82,15 +82,15 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* state) {
for (const auto& filter_desc : _runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
- RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false));
+ RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, state->query_options(), id(), false));
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
- filter_desc.filter_id, &runtime_filter));
+ filter_desc.filter_id, id(), &runtime_filter));
} else {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
+ filter_desc, state->query_options(), id(), false));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
- &runtime_filter));
+ id(), &runtime_filter));
}
runtime_filter->init_profile(_runtime_profile.get());
}
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index ee1628cd19..626fd5ce96 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -37,22 +37,34 @@ class TPlanNode;
namespace vectorized {
VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ExecNode(pool, tnode, descs), _child_eos(false) {}
+ : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {}
Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) {
- return ExecNode::init(tnode, state);
+ return RuntimeFilterConsumerNode::init(tnode, state);
}
Status VSelectNode::prepare(RuntimeState* state) {
- return ExecNode::prepare(state);
+ return RuntimeFilterConsumerNode::prepare(state);
}
Status VSelectNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::open(state));
+ RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state));
RETURN_IF_ERROR(child(0)->open(state));
return Status::OK();
}
+Status VSelectNode::alloc_resource(RuntimeState* state) {
+ if (_opened) {
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state));
+ RETURN_IF_ERROR(_acquire_runtime_filter());
+ RETURN_IF_CANCELLED(state);
+ _opened = true;
+ return Status::OK();
+}
+
Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index d6783d7237..140009e4b3 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -17,7 +17,7 @@
#pragma once
#include "common/status.h"
-#include "exec/exec_node.h"
+#include "vec/exec/runtime_filter_consumer_node.h"
namespace doris {
class DescriptorTbl;
@@ -28,7 +28,7 @@ class TPlanNode;
namespace vectorized {
class Block;
-class VSelectNode final : public ExecNode {
+class VSelectNode final : public RuntimeFilterConsumerNode {
public:
VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
@@ -38,9 +38,12 @@ public:
Status close(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
+ Status alloc_resource(RuntimeState* state) override;
+
private:
// true if last get_next() call on child signalled eos
bool _child_eos;
+ bool _opened = false;
};
} // namespace vectorized
} // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 8593b750ba..10cb05acba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1441,7 +1441,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
// translate runtime filter
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> {
- List<RuntimeFilter> filters = runtimeFilterTranslator
+ Set<RuntimeFilter> filters = runtimeFilterTranslator
.getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
filters.forEach(filter -> runtimeFilterTranslator
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 9df4e74ea4..da4bb2e131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.ObjectId;
@@ -41,9 +42,11 @@ import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* translate runtime filter
@@ -57,7 +60,7 @@ public class RuntimeFilterTranslator {
context.generatePhysicalHashJoinToRuntimeFilter();
}
- public List<RuntimeFilter> getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
+ public Set<RuntimeFilter> getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
return context.getRuntimeFilterOnHashJoinNode(join);
}
@@ -101,51 +104,65 @@ public class RuntimeFilterTranslator {
* @param ctx plan translator context
*/
public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, PlanTranslatorContext ctx) {
- Expr target = context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId());
- if (target == null) {
- context.setTargetNullCount();
- return;
- }
- Expr targetExpr;
- if (filter.getType() == TRuntimeFilterType.BITMAP) {
- if (filter.getTargetExpression().equals(filter.getTargetExpr())) {
- targetExpr = target;
+ Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
+ List<Expr> targetExprList = new ArrayList<>();
+ List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new ArrayList<>();
+ List<ScanNode> scanNodeList = new ArrayList<>();
+ boolean hasInvalidTarget = false;
+ for (int i = 0; i < filter.getTargetExprs().size(); i++) {
+ Slot curTargetExpr = filter.getTargetExprs().get(i);
+ Expression curTargetExpression = filter.getTargetExpressions().get(i);
+ Expr target = context.getExprIdToOlapScanNodeSlotRef().get(curTargetExpr.getExprId());
+ if (target == null) {
+ context.setTargetNullCount();
+ hasInvalidTarget = true;
+ break;
+ }
+ Expr targetExpr;
+ if (filter.getType() == TRuntimeFilterType.BITMAP) {
+ if (curTargetExpression.equals(curTargetExpr)) {
+ targetExpr = target;
+ } else {
+ RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
+ context.getExprIdToOlapScanNodeSlotRef());
+ targetExpr = curTargetExpression.accept(translator, ctx);
+ }
} else {
- RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
- context.getExprIdToOlapScanNodeSlotRef());
- targetExpr = filter.getTargetExpression().accept(translator, ctx);
+ targetExpr = target;
}
- } else {
- targetExpr = target;
- }
-
- Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
- SlotRef targetSlot = target.getSrcSlotRef();
- TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
- SlotId targetSlotId = targetSlot.getSlotId();
- // adjust data type
- if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
- targetExpr = new CastExpr(src.getType(), targetExpr);
+ // adjust data type
+ if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
+ targetExpr = new CastExpr(src.getType(), targetExpr);
+ }
+ SlotRef targetSlot = target.getSrcSlotRef();
+ TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
+ SlotId targetSlotId = targetSlot.getSlotId();
+ ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr);
+ scanNodeList.add(scanNode);
+ targetExprList.add(targetExpr);
+ targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)));
}
- org.apache.doris.planner.RuntimeFilter origFilter
- = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
- filter.getId(), node, src, filter.getExprOrder(), targetExpr,
- ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
- filter.getType(), context.getLimits(), filter.getBuildSideNdv());
- if (node instanceof HashJoinNode) {
- origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
- } else {
- //bitmap rf requires isBroadCast=false, it always requires merge filter
- origFilter.setIsBroadcast(false);
+ if (!hasInvalidTarget) {
+ org.apache.doris.planner.RuntimeFilter origFilter
+ = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+ filter.getId(), node, src, filter.getExprOrder(), targetExprList,
+ targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv());
+ if (node instanceof HashJoinNode) {
+ origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
+ } else {
+ //bitmap rf requires isBroadCast=false, it always requires merge filter
+ origFilter.setIsBroadcast(false);
+ }
+ boolean isLocalTarget = scanNodeList.stream().allMatch(e -> e.getFragmentId().equals(node.getFragmentId()));
+ for (int i = 0; i < targetExprList.size(); i++) {
+ ScanNode scanNode = scanNodeList.get(i);
+ Expr targetExpr = targetExprList.get(i);
+ origFilter.addTarget(new RuntimeFilterTarget(
+ scanNode, targetExpr, true, isLocalTarget));
+ }
+ origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
+ context.getLegacyFilters().add(finalize(origFilter));
}
- ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr());
- origFilter.addTarget(new RuntimeFilterTarget(
- scanNode,
- targetExpr,
- true,
- scanNode.getFragmentId().equals(node.getFragmentId())));
- origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
- context.getLegacyFilters().add(finalize(origFilter));
}
private org.apache.doris.planner.RuntimeFilter finalize(org.apache.doris.planner.RuntimeFilter origFilter) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 69986cde67..b9d2ac301e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -67,7 +67,7 @@ public class RuntimeFilterContext {
// exprId to olap scan node slotRef because the slotRef will be changed when translating.
private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = Maps.newHashMap();
- private final Map<AbstractPhysicalJoin, List<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap();
+ private final Map<AbstractPhysicalJoin, Set<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap();
// alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way
// Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A.
@@ -97,7 +97,7 @@ public class RuntimeFilterContext {
}
public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
- Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
+ Preconditions.checkArgument(filter.getTargetExprs().stream().anyMatch(expr -> expr.getExprId() == id));
this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter);
}
@@ -135,13 +135,13 @@ public class RuntimeFilterContext {
return scanNodeOfLegacyRuntimeFilterTarget;
}
- public List<RuntimeFilter> getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
- return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptyList());
+ public Set<RuntimeFilter> getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
+ return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptySet());
}
public void generatePhysicalHashJoinToRuntimeFilter() {
targetExprIdToFilter.values().forEach(filters -> filters.forEach(filter -> runtimeFilterOnHashJoinNode
- .computeIfAbsent(filter.getBuilderNode(), k -> Lists.newArrayList()).add(filter)));
+ .computeIfAbsent(filter.getBuilderNode(), k -> Sets.newHashSet()).add(filter)));
}
public Map<ExprId, List<RuntimeFilter>> getTargetExprIdToFilter() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index bb0272c3a7..9ec1a34c9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -27,15 +27,19 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@@ -44,8 +48,10 @@ import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -105,29 +111,81 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
- // currently, we can ensure children in the two side are corresponding to the equal_to's.
- // so right maybe an expression and left is a slot
- Slot unwrappedSlot = checkTargetChild(equalTo.left());
- // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
- // contains join with denied join type. for example: a left join b on a.id = b.id
- if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
- continue;
- }
- Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
- PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
- // in-filter is not friendly to pipeline
- if (type == TRuntimeFilterType.IN_OR_BLOOM
- && ctx.getSessionVariable().enablePipelineEngine()
- && hasRemoteTarget(join, scan)) {
- type = TRuntimeFilterType.BLOOM;
- }
+ if (join.left() instanceof PhysicalUnion
+ || join.left() instanceof PhysicalIntersect
+ || join.left() instanceof PhysicalExcept) {
+ List<Slot> targetList = new ArrayList<>();
+ int projIndex = -1;
+ for (int j = 0; j < join.left().children().size(); j++) {
+ PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+ if (child instanceof PhysicalProject) {
+ PhysicalProject project = (PhysicalProject) child;
+ Slot leftSlot = checkTargetChild(equalTo.left());
+ if (leftSlot == null) {
+ break;
+ }
+ for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+ NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+ if (expr.getName().equals(leftSlot.getName())) {
+ projIndex = k;
+ break;
+ }
+ }
+ Preconditions.checkState(projIndex >= 0
+ && projIndex < project.getProjects().size());
- long buildSideNdv = getBuildSideNdv(join, equalTo);
- RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
- equalTo.right(), olapScanSlot, type, i, join, buildSideNdv);
- ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
- ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
- ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+ NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+ SlotReference origSlot = null;
+ if (targetExpr instanceof Alias) {
+ origSlot = (SlotReference) targetExpr.child(0);
+ } else {
+ origSlot = (SlotReference) targetExpr;
+ }
+ Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+ PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+ if (type == TRuntimeFilterType.IN_OR_BLOOM
+ && ctx.getSessionVariable().enablePipelineEngine()
+ && hasRemoteTarget(join, scan)) {
+ type = TRuntimeFilterType.BLOOM;
+ }
+ targetList.add(olapScanSlot);
+ ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+ ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+ }
+ }
+ if (!targetList.isEmpty()) {
+ long buildSideNdv = getBuildSideNdv(join, equalTo);
+ RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+ equalTo.right(), targetList, type, i, join, buildSideNdv);
+ for (int j = 0; j < targetList.size(); j++) {
+ ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+ }
+ }
+ } else {
+ // currently, we can ensure children in the two side are corresponding to the equal_to's.
+ // so right maybe an expression and left is a slot
+ Slot unwrappedSlot = checkTargetChild(equalTo.left());
+ // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+ // contains join with denied join type. for example: a left join b on a.id = b.id
+ if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+ continue;
+ }
+ Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+ PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+ // in-filter is not friendly to pipeline
+ if (type == TRuntimeFilterType.IN_OR_BLOOM
+ && ctx.getSessionVariable().enablePipelineEngine()
+ && hasRemoteTarget(join, scan)) {
+ type = TRuntimeFilterType.BLOOM;
+ }
+ long buildSideNdv = getBuildSideNdv(join, equalTo);
+ RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+ equalTo.right(), ImmutableList.of(olapScanSlot), type, i, join, buildSideNdv);
+ ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+ ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+ ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+ }
}
}
}
@@ -194,8 +252,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
if (targetSlot != null && aliasTransferMap.containsKey(targetSlot)) {
Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
- bitmapContains.child(0), olapScanSlot,
- bitmapContains.child(1), type, i, join, isNot, -1L);
+ bitmapContains.child(0), ImmutableList.of(olapScanSlot),
+ ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
@@ -221,7 +279,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
if (expr instanceof NamedExpression && aliasTransferMap.containsKey((NamedExpression) expr)) {
if (expression instanceof Alias) {
Alias alias = ((Alias) expression);
- aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(expr));
+ aliasTransferMap.put(alias.toSlot(), aliasTransferMap.get(expr));
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 703a043b48..ff907f62aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -22,6 +22,10 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.thrift.TRuntimeFilterType;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
/**
* runtime filter
*/
@@ -32,8 +36,8 @@ public class RuntimeFilter {
private final Expression srcSlot;
//bitmap filter support target expression like k1+1, abs(k1)
//targetExpression is an expression on targetSlot, in which there is only one non-const slot
- private final Expression targetExpression;
- private final Slot targetSlot;
+ private final List<Expression> targetExpressions;
+ private final List<Slot> targetSlots;
private final int exprOrder;
private final AbstractPhysicalJoin builderNode;
@@ -44,21 +48,21 @@ public class RuntimeFilter {
/**
* constructor
*/
- public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, TRuntimeFilterType type,
+ public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type,
int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) {
- this(id, src, target, target, type, exprOrder, builderNode, false, buildSideNdv);
+ this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, builderNode, false, buildSideNdv);
}
/**
* constructor
*/
- public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, Expression targetExpression,
+ public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn,
long buildSideNdv) {
this.id = id;
this.srcSlot = src;
- this.targetSlot = target;
- this.targetExpression = targetExpression;
+ this.targetSlots = ImmutableList.copyOf(targets);
+ this.targetExpressions = ImmutableList.copyOf(targetExpressions);
this.type = type;
this.exprOrder = exprOrder;
this.builderNode = builderNode;
@@ -71,8 +75,8 @@ public class RuntimeFilter {
return srcSlot;
}
- public Slot getTargetExpr() {
- return targetSlot;
+ public List<Slot> getTargetExprs() {
+ return targetSlots;
}
public RuntimeFilterId getId() {
@@ -95,8 +99,8 @@ public class RuntimeFilter {
return bitmapFilterNotIn;
}
- public Expression getTargetExpression() {
- return targetExpression;
+ public List<Expression> getTargetExpressions() {
+ return targetExpressions;
}
public long getBuildSideNdv() {
@@ -107,7 +111,7 @@ public class RuntimeFilter {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("RF").append(id.asInt())
- .append("[").append(getSrcExpr()).append("->").append(targetSlot)
+ .append("[").append(getSrcExpr()).append("->").append(targetSlots)
.append("(ndv/size = ").append(buildSideNdv).append("/")
.append(org.apache.doris.planner.RuntimeFilter.expectRuntimeFilterSize(buildSideNdv))
.append(")");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 484a666851..d45f4f2611 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,12 +72,12 @@ public final class RuntimeFilter {
// The position of expr in the join condition
private final int exprOrder;
// Expr (lhs of join predicate) from which the targetExprs_ are generated.
- private final Expr origTargetExpr;
+ private final List<Expr> origTargetExprs;
// Runtime filter targets
private final List<RuntimeFilterTarget> targets = new ArrayList<>();
// Slots from base table tuples that have value transfer from the slots
// of 'origTargetExpr'. The slots are grouped by tuple id.
- private final Map<TupleId, List<SlotId>> targetSlotsByTid;
+ private final List<Map<TupleId, List<SlotId>>> targetSlotsByTid;
// If true, the join node building this filter is executed using a broadcast join;
// set in the DistributedPlanner.createHashJoinFragment()
private boolean isBroadcastJoin;
@@ -139,14 +140,14 @@ public final class RuntimeFilter {
}
private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder,
- Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots, TRuntimeFilterType type,
+ List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots, TRuntimeFilterType type,
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
this.id = filterId;
this.builderNode = filterSrcNode;
this.srcExpr = srcExpr;
this.exprOrder = exprOrder;
- this.origTargetExpr = origTargetExpr;
- this.targetSlotsByTid = targetSlots;
+ this.origTargetExprs = ImmutableList.copyOf(origTargetExprs);
+ this.targetSlotsByTid = ImmutableList.copyOf(targetSlots);
this.runtimeFilterType = type;
this.ndvEstimate = buildSizeNdv;
computeNdvEstimate();
@@ -155,9 +156,9 @@ public final class RuntimeFilter {
// only for nereids planner
public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr,
- int exprOrder, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
+ int exprOrder, List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots,
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
- return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr,
+ return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs,
targetSlots, type, filterSizeLimits, buildSizeNdv);
}
@@ -237,11 +238,11 @@ public final class RuntimeFilter {
return srcExpr;
}
- public Expr getOrigTargetExpr() {
- return origTargetExpr;
+ public List<Expr> getOrigTargetExprs() {
+ return origTargetExprs;
}
- public Map<TupleId, List<SlotId>> getTargetSlots() {
+ public List<Map<TupleId, List<SlotId>>> getTargetSlots() {
return targetSlotsByTid;
}
@@ -329,7 +330,7 @@ public final class RuntimeFilter {
}
return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
- targetExpr, targetSlots, type, filterSizeLimits, -1L);
+ ImmutableList.of(targetExpr), ImmutableList.of(targetSlots), type, filterSizeLimits, -1L);
}
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer, Expr joinPredicate,
@@ -365,7 +366,8 @@ public final class RuntimeFilter {
}
RuntimeFilter runtimeFilter =
- new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, targetExpr, targetSlots,
+ new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
+ ImmutableList.of(targetExpr), ImmutableList.of(targetSlots),
type, filterSizeLimits, -1L);
runtimeFilter.setBitmapFilterNotIn(((BitmapFilterPredicate) joinPredicate).isNotIn());
return runtimeFilter;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index 7de864f50a..fb1b4da92b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -317,10 +317,12 @@ public final class RuntimeFilterGenerator {
* destination node for that filter.
*/
private void registerRuntimeFilter(RuntimeFilter filter) {
- Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
- Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
- for (TupleId tupleId : targetSlotsByTid.keySet()) {
- registerRuntimeFilter(filter, tupleId);
+ List<Map<TupleId, List<SlotId>>> targetSlotsByTids = filter.getTargetSlots();
+ for (Map<TupleId, List<SlotId>> targetSlotsByTid : targetSlotsByTids) {
+ Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
+ for (TupleId tupleId : targetSlotsByTid.keySet()) {
+ registerRuntimeFilter(filter, tupleId);
+ }
}
}
@@ -328,7 +330,7 @@ public final class RuntimeFilterGenerator {
* Registers a runtime filter with a specific target tuple id.
*/
private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
- Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+ Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid)));
List<RuntimeFilter> filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>());
Preconditions.checkState(!filter.isFinalized());
filters.add(filter);
@@ -344,9 +346,11 @@ public final class RuntimeFilterGenerator {
for (RuntimeFilter.RuntimeFilterTarget target : runtimeFilter.getTargets()) {
targetTupleIds.addAll(target.node.getTupleIds());
}
- for (TupleId tupleId : runtimeFilter.getTargetSlots().keySet()) {
- if (!targetTupleIds.contains(tupleId)) {
- runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+ for (Map<TupleId, List<SlotId>> slots : runtimeFilter.getTargetSlots()) {
+ for (TupleId tupleId : slots.keySet()) {
+ if (!targetTupleIds.contains(tupleId)) {
+ runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+ }
}
}
runtimeFilter.markFinalized();
@@ -424,14 +428,22 @@ public final class RuntimeFilterGenerator {
* the scan node with target tuple descriptor 'targetTid'.
*/
private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) {
- Expr targetExpr = filter.getOrigTargetExpr();
+ Preconditions.checkState(filter.getTargetSlots().size() == filter.getOrigTargetExprs().size());
+ Expr targetExpr = null;
+ for (int i = 0; i < filter.getOrigTargetExprs().size(); i++) {
+ if (filter.getTargetSlots().get(i).containsKey(targetTid)) {
+ targetExpr = filter.getOrigTargetExprs().get(i);
+ break;
+ }
+ }
+ Preconditions.checkState(targetExpr != null);
// if there is a subquery on the left side of join, in order to push to scan in the subquery,
// targetExpr will return false as long as there is a slotref parent node that is not targetTid.
// But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc,
// targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns,
// then replace Aa with Ba, and then calculate for targetTid targetExpr
if (!targetExpr.isBound(targetTid)) {
- Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+ Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid)));
// Modify the filter target expr using the equivalent slots from the scan node
// on which the filter will be applied.
ExprSubstitutionMap smap = new ExprSubstitutionMap();
@@ -440,7 +452,14 @@ public final class RuntimeFilterGenerator {
// all slots of targetSlotsByTid.
targetExpr.collect(SlotRef.class, exprSlots);
// targetExpr specifies the id of the slotRef node in the `tupleID`
- List<SlotId> sids = filter.getTargetSlots().get(targetTid);
+ List<SlotId> sids = new ArrayList<>();
+ for (Map<TupleId, List<SlotId>> map : filter.getTargetSlots()) {
+ if (map.containsKey(targetTid)) {
+ sids = map.get(targetTid);
+ break;
+ }
+ }
+ Preconditions.checkState(!sids.isEmpty());
for (SlotRef slotRef : exprSlots) {
for (SlotId sid : sids) {
if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index e4dd754be9..699df19ba4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -252,7 +252,7 @@ public class RuntimeFilterTest extends SSBTestBase {
for (RuntimeFilter filter : filters) {
Assertions.assertTrue(colNames.contains(Pair.of(
filter.getSrcExpr().toSql(),
- filter.getTargetExpr().getName())));
+ filter.getTargetExprs().get(0).getName())));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org