You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by en...@apache.org on 2023/06/16 12:26:09 UTC

[doris] branch master updated: [feature](nereids) Support multi target rf #20714

This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab32299ba4 [feature](nereids) Support multi target rf #20714
ab32299ba4 is described below

commit ab32299ba40bef55d4ba626a43df8b313c7718dc
Author: xzj7019 <13...@users.noreply.github.com>
AuthorDate: Fri Jun 16 20:26:00 2023 +0800

    [feature](nereids) Support multi target rf #20714
    
    Support multi target runtime filter, mainly for set operation, such as union/intersect/except.
---
 be/src/exprs/runtime_filter.cpp                    |  19 ++-
 be/src/exprs/runtime_filter.h                      |   2 +
 be/src/exprs/runtime_filter_slots.h                |  12 +-
 be/src/runtime/fragment_mgr.cpp                    |  16 +-
 be/src/runtime/runtime_filter_mgr.cpp              | 149 ++++++++++++-------
 be/src/runtime/runtime_filter_mgr.h                |  29 ++--
 be/src/vec/columns/column_nullable.h               |   6 +-
 be/src/vec/exec/join/vhash_join_node.cpp           |   5 +-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |   4 +-
 be/src/vec/exec/runtime_filter_consumer_node.cpp   | 164 +++++++++++++++++++++
 be/src/vec/exec/runtime_filter_consumer_node.h     |  73 +++++++++
 be/src/vec/exec/scan/vscan_node.cpp                | 138 +----------------
 be/src/vec/exec/scan/vscan_node.h                  |  37 +----
 be/src/vec/exec/vdata_gen_scan_node.cpp            |  12 +-
 be/src/vec/exec/vselect_node.cpp                   |  20 ++-
 be/src/vec/exec/vselect_node.h                     |   7 +-
 .../glue/translator/PhysicalPlanTranslator.java    |   2 +-
 .../glue/translator/RuntimeFilterTranslator.java   | 101 +++++++------
 .../processor/post/RuntimeFilterContext.java       |  10 +-
 .../processor/post/RuntimeFilterGenerator.java     | 108 ++++++++++----
 .../trees/plans/physical/RuntimeFilter.java        |  28 ++--
 .../org/apache/doris/planner/RuntimeFilter.java    |  26 ++--
 .../doris/planner/RuntimeFilterGenerator.java      |  41 ++++--
 .../nereids/postprocess/RuntimeFilterTest.java     |   2 +-
 24 files changed, 631 insertions(+), 380 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c11ab8715c..2c6f3662bb 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1105,6 +1105,12 @@ Status IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterC
     return Status::OK();
 }
 
+void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
+    _wrapper->_filter_type = other->_wrapper->_filter_type;
+    _wrapper->_is_bloomfilter = other->is_bloomfilter();
+    _wrapper->_context = other->_wrapper->_context;
+}
+
 void IRuntimeFilter::insert(const void* data) {
     DCHECK(is_producer());
     if (!_is_ignored) {
@@ -1126,13 +1132,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column,
 Status IRuntimeFilter::publish() {
     DCHECK(is_producer());
     if (_has_local_target) {
-        IRuntimeFilter* consumer_filter = nullptr;
-        RETURN_IF_ERROR(
-                _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter));
+        std::vector<IRuntimeFilter*> filters;
+        RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id, filters));
         // push down
-        consumer_filter->_wrapper = _wrapper;
-        consumer_filter->update_runtime_filter_type_to_profile();
-        consumer_filter->signal();
+        for (auto filter : filters) {
+            filter->_wrapper = _wrapper;
+            filter->update_runtime_filter_type_to_profile();
+            filter->signal();
+        }
         return Status::OK();
     } else {
         TNetworkAddress addr;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 7042dccc1c..a4fd241a28 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -207,6 +207,8 @@ public:
     void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
     Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
 
+    void copy_from_other(IRuntimeFilter* other);
+
     // insert data to build filter
     // only used for producer
     void insert(const void* data);
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
index 08532251bc..816e62c731 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -51,11 +51,13 @@ public:
         std::map<int, bool> has_in_filter;
 
         auto ignore_local_filter = [state](int filter_id) {
-            IRuntimeFilter* consumer_filter = nullptr;
-            state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
-            DCHECK(consumer_filter != nullptr);
-            consumer_filter->set_ignored();
-            consumer_filter->signal();
+            std::vector<IRuntimeFilter*> filters;
+            state->runtime_filter_mgr()->get_consume_filters(filter_id, filters);
+            DCHECK(!filters.empty());
+            for (auto filter : filters) {
+                filter->set_ignored();
+                filter->signal();
+            }
         };
 
         auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c98bebafa1..4581768199 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1276,9 +1276,19 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
 
         UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
         int filter_id = request->filter_id();
-        IRuntimeFilter* real_filter = nullptr;
-        RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter));
-        RETURN_IF_ERROR(real_filter->update_filter(&params, start_apply));
+        std::vector<IRuntimeFilter*> filters;
+        RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters));
+
+        IRuntimeFilter* first_filter = nullptr;
+        for (auto filter : filters) {
+            if (!first_filter) {
+                RETURN_IF_ERROR(filter->update_filter(&params, start_apply));
+                first_filter = filter;
+            } else {
+                filter->copy_from_other(first_filter);
+                filter->signal();
+            }
+        }
     }
 
     return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index e00149a7fa..e2b4c525ac 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -55,91 +55,124 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
 RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
         : _query_ctx(query_ctx) {}
 
-RuntimeFilterMgr::~RuntimeFilterMgr() = default;
-
 Status RuntimeFilterMgr::init() {
     _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
                                             ExecEnv::GetInstance()->experimental_mem_tracker());
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
-                                            IRuntimeFilter** target) {
+Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) {
     int32_t key = filter_id;
-    std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
 
-    if (role == RuntimeFilterRole::CONSUMER) {
-        filter_map = &_consumer_map;
-    } else {
-        filter_map = &_producer_map;
+    auto iter = _producer_map.find(key);
+    if (iter == _producer_map.end()) {
+        LOG(WARNING) << "unknown runtime filter: " << key << ", role: PRODUCER";
+        return Status::InvalidArgument("unknown filter");
     }
 
-    auto iter = filter_map->find(key);
-    if (iter == filter_map->end()) {
-        return Status::InternalError("get filter failed, key={}, role={}", key, (int)role);
-    }
-    *target = iter->second.filter;
+    *target = iter->second;
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter) {
-    return get_filter_by_role(filter_id, RuntimeFilterRole::CONSUMER, consumer_filter);
+Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_id,
+                                            IRuntimeFilter** consumer_filter) {
+    auto iter = _consumer_map.find(filter_id);
+    if (iter == _consumer_map.cend()) {
+        LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role: consumer";
+        return Status::InvalidArgument("unknown filter");
+    }
+
+    for (auto& item : iter->second) {
+        if (item.node_id == node_id) {
+            *consumer_filter = item.filter;
+            return Status::OK();
+        }
+    }
+
+    return Status::InvalidArgument(
+            fmt::format("unknown filter, filter_id: {}, node_id: {}", filter_id, node_id));
 }
 
-Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
-                                             IRuntimeFilter** producer_filter) {
-    return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter);
+Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
+                                             std::vector<IRuntimeFilter*>& consumer_filters) {
+    int32_t key = filter_id;
+    auto iter = _consumer_map.find(key);
+    if (iter == _consumer_map.end()) {
+        LOG(WARNING) << "unknown runtime filter: " << key << ", role: consumer";
+        return Status::InvalidArgument("unknown filter");
+    }
+    for (auto& holder : iter->second) {
+        consumer_filters.emplace_back(holder.filter);
+    }
+    return Status::OK();
 }
 
-Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
-                                         const TRuntimeFilterDesc& desc,
-                                         const TQueryOptions& options, int node_id,
-                                         bool build_bf_exactly) {
-    DCHECK(role != RuntimeFilterRole::CONSUMER || node_id >= 0);
+Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc,
+                                                  const TQueryOptions& options, int node_id,
+                                                  bool build_bf_exactly) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
-    std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
-    if (role == RuntimeFilterRole::CONSUMER) {
-        filter_map = &_consumer_map;
-    } else {
-        filter_map = &_producer_map;
-    }
-    VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role;
-
-    auto iter = filter_map->find(key);
-
-    RuntimeFilterMgrVal filter_mgr_val;
-    filter_mgr_val.role = role;
-
-    if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == RuntimeFilterRole::CONSUMER &&
-        desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) {
+    auto iter = _consumer_map.find(key);
+    if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets &&
+        desc.type == TRuntimeFilterType::BLOOM) {
         // if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
         DCHECK(_query_ctx != nullptr);
-        if (iter != filter_map->end()) {
-            return Status::OK();
-        }
+
         {
             std::lock_guard<std::mutex> l(_lock);
-            iter = filter_map->find(key);
-            if (iter != filter_map->end()) {
-                return Status::OK();
+
+            iter = _consumer_map.find(key);
+            if (iter != _consumer_map.end()) {
+                for (auto holder : iter->second) {
+                    if (holder.node_id == node_id) {
+                        return Status::OK();
+                    }
+                }
             }
+            IRuntimeFilter* filter;
             RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc,
-                                                   &options, role, node_id, &filter_mgr_val.filter,
-                                                   build_bf_exactly));
-            filter_map->emplace(key, filter_mgr_val);
+
+                                                   &options, RuntimeFilterRole::CONSUMER, node_id,
+                                                   &filter, build_bf_exactly));
+            _consumer_map[key].emplace_back(node_id, filter);
         }
     } else {
         DCHECK(_state != nullptr);
-        if (iter != filter_map->end()) {
-            return Status::InvalidArgument("filter has registed");
+
+        if (iter != _consumer_map.end()) {
+            for (auto holder : iter->second) {
+                if (holder.node_id == node_id) {
+                    return Status::InvalidArgument("filter has registered");
+                }
+            }
         }
-        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id,
-                                               &filter_mgr_val.filter, build_bf_exactly));
-        filter_map->emplace(key, filter_mgr_val);
+
+        IRuntimeFilter* filter;
+        RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                               RuntimeFilterRole::CONSUMER, node_id, &filter,
+                                               build_bf_exactly));
+        _consumer_map[key].emplace_back(node_id, filter);
     }
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc,
+                                                  const TQueryOptions& options,
+                                                  bool build_bf_exactly) {
+    SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+    int32_t key = desc.filter_id;
+    auto iter = _producer_map.find(key);
 
+    DCHECK(_state != nullptr);
+    if (iter != _producer_map.end()) {
+        return Status::InvalidArgument("filter has registed");
+    }
+    IRuntimeFilter* filter;
+    RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+                                           RuntimeFilterRole::PRODUCER, -1, &filter,
+                                           build_bf_exactly));
+    _producer_map.emplace(key, filter);
     return Status::OK();
 }
 
@@ -148,9 +181,13 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     UpdateRuntimeFilterParams params(request, data, &_pool);
     int filter_id = request->filter_id();
-    IRuntimeFilter* real_filter = nullptr;
-    RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
-    return real_filter->update_filter(&params);
+    std::vector<IRuntimeFilter*> filters;
+    RETURN_IF_ERROR(get_consume_filters(filter_id, filters));
+    for (auto filter : filters) {
+        RETURN_IF_ERROR(filter->update_filter(&params));
+    }
+
+    return Status::OK();
 }
 
 void RuntimeFilterMgr::set_runtime_filter_params(
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 0a0a70ec7a..f3cb32813f 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -68,18 +68,22 @@ public:
 
     RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
 
-    ~RuntimeFilterMgr();
+    ~RuntimeFilterMgr() = default;
 
     Status init();
 
-    // get a consumer filter by filter-id
-    Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
+    Status get_consume_filter(const int filter_id, const int node_id,
+                              IRuntimeFilter** consumer_filter);
+
+    Status get_consume_filters(const int filter_id, std::vector<IRuntimeFilter*>& consumer_filters);
 
     Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
-    // regist filter
-    Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
-                           const TQueryOptions& options, int node_id = -1,
-                           bool build_bf_exactly = false);
+
+    // register filter
+    Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+                                    int node_id, bool build_bf_exactly = false);
+    Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
+                                    bool build_bf_exactly = false);
 
     // update filter by remote
     Status update_filter(const PPublishFilterRequest* request,
@@ -90,19 +94,16 @@ public:
     Status get_merge_addr(TNetworkAddress* addr);
 
 private:
-    Status get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
-                              IRuntimeFilter** target);
-
-    struct RuntimeFilterMgrVal {
-        RuntimeFilterRole role; // consumer or producer
+    struct ConsumerFilterHolder {
+        int node_id;
         IRuntimeFilter* filter;
     };
     // RuntimeFilterMgr is owned by RuntimeState, so we only
     // use filter_id as key
     // key: "filter-id"
     /// TODO: should it need protected by a mutex?
-    std::map<int32_t, RuntimeFilterMgrVal> _consumer_map;
-    std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
+    std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
+    std::map<int32_t, IRuntimeFilter*> _producer_map;
 
     RuntimeState* _state;
     QueryContext* _query_ctx;
diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h
index 7d4001f6fe..d5ca7f844b 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -98,7 +98,11 @@ public:
     const char* get_family_name() const override { return "Nullable"; }
     std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
     MutableColumnPtr clone_resized(size_t size) const override;
-    size_t size() const override { return nested_column->size(); }
+    size_t size() const override {
+        return nested_column->size(
+
+        );
+    }
     bool is_null_at(size_t n) const override {
         return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
     }
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 0770d26e74..2a239c48b0 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -392,9 +392,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
     _runtime_filters.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
-                RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1,
-                _probe_expr_ctxs.size() == 1));
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+                _runtime_filter_descs[i], state->query_options(), _probe_expr_ctxs.size() == 1));
         RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
                 _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
     }
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 2a603e7680..0b50860874 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -124,8 +124,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
     std::vector<TExpr> filter_src_exprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
         filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
-                RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
+                _runtime_filter_descs[i], state->query_options()));
     }
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs));
     return Status::OK();
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer_node.cpp
new file mode 100644
index 0000000000..dd631ce66e
--- /dev/null
+++ b/be/src/vec/exec/runtime_filter_consumer_node.cpp
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/runtime_filter_consumer_node.h"
+
+namespace doris::vectorized {
+
+RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode,
+                                                     const DescriptorTbl& descs)
+        : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
+
+Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    _state = state;
+    RETURN_IF_ERROR(_register_runtime_filter());
+    return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::_register_runtime_filter() {
+    int filter_size = _runtime_filter_descs.size();
+    _runtime_filter_ctxs.reserve(filter_size);
+    _runtime_filter_ready_flag.reserve(filter_size);
+    for (int i = 0; i < filter_size; ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        const auto& filter_desc = _runtime_filter_descs[i];
+        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+            DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
+            // Optimize merging phase iff:
+            // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
+            // 2. This filter is bloom filter (only bloom filter should be used for merging)
+            RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, _state->query_options(), id(), false));
+            RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+                    filter_desc.filter_id, id(), &runtime_filter));
+        } else {
+            RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, _state->query_options(), id(), false));
+            RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
+                    filter_desc.filter_id, id(), &runtime_filter));
+        }
+        _runtime_filter_ctxs.emplace_back(runtime_filter);
+        _runtime_filter_ready_flag.emplace_back(false);
+    }
+    return Status::OK();
+}
+
+bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
+    if (!_blocked_by_rf) {
+        return true;
+    }
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+        if (!runtime_filter->is_ready_or_timeout()) {
+            return false;
+        }
+    }
+    _blocked_by_rf = false;
+    return true;
+}
+
+Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
+    SCOPED_TIMER(_acquire_runtime_filter_timer);
+    VExprSPtrs vexprs;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
+        bool ready = runtime_filter->is_ready();
+        if (!ready && wait) {
+            ready = runtime_filter->await();
+        }
+        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
+            _runtime_filter_ctxs[i].apply_mark = true;
+        } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
+                   runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
+                   !_runtime_filter_ctxs[i].apply_mark) {
+            _blocked_by_rf = true;
+        } else if (!_runtime_filter_ctxs[i].apply_mark) {
+            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
+            _is_all_rf_applied = false;
+        }
+    }
+    RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+    if (_blocked_by_rf) {
+        return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
+    }
+
+    return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
+    if (vexprs.empty()) {
+        return Status::OK();
+    }
+
+    for (auto& expr : vexprs) {
+        VExprContextSPtr conjunct = VExprContext::create_shared(expr);
+        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
+        RETURN_IF_ERROR(conjunct->open(_state));
+        _rf_vexpr_set.insert(expr);
+        _conjuncts.emplace_back(conjunct);
+    }
+
+    return Status::OK();
+}
+
+Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
+    if (_is_all_rf_applied) {
+        *arrived_rf_num = _runtime_filter_descs.size();
+        return Status::OK();
+    }
+
+    // This method will be called in scanner thread.
+    // So need to add lock
+    std::unique_lock l(_rf_locks);
+    if (_is_all_rf_applied) {
+        *arrived_rf_num = _runtime_filter_descs.size();
+        return Status::OK();
+    }
+
+    // 1. Check if are runtime filter ready but not applied.
+    VExprSPtrs exprs;
+    int current_arrived_rf_num = 0;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        if (_runtime_filter_ctxs[i].apply_mark) {
+            ++current_arrived_rf_num;
+            continue;
+        } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
+            RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
+                    &exprs, _row_descriptor, _state));
+            ++current_arrived_rf_num;
+            _runtime_filter_ctxs[i].apply_mark = true;
+        }
+    }
+    // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+    if (!exprs.empty()) {
+        RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
+    }
+    if (current_arrived_rf_num == _runtime_filter_descs.size()) {
+        _is_all_rf_applied = true;
+    }
+
+    *arrived_rf_num = current_arrived_rf_num;
+    return Status::OK();
+}
+
+void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
+    _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime");
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer_node.h
new file mode 100644
index 0000000000..518e0e865c
--- /dev/null
+++ b/be/src/vec/exec/runtime_filter_consumer_node.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/exec_node.h"
+#include "exprs/runtime_filter.h"
+
+namespace doris::vectorized {
+
+class RuntimeFilterConsumerNode : public ExecNode {
+public:
+    RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+    ~RuntimeFilterConsumerNode() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+
+    // Try append late arrived runtime filters.
+    // Return num of filters which are applied already.
+    Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
+
+    bool runtime_filters_are_ready_or_timeout();
+
+protected:
+    // Register and get all runtime filters at Init phase.
+    Status _register_runtime_filter();
+    // Get all arrived runtime filters at Open phase.
+    Status _acquire_runtime_filter(bool wait = true);
+    // Append late-arrival runtime filters to the vconjunct_ctx.
+    Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
+
+    void _prepare_rf_timer(RuntimeProfile* profile);
+
+    // For runtime filters
+    struct RuntimeFilterContext {
+        RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
+        RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
+        // set to true if this runtime filter is already applied to vconjunct_ctx_ptr
+        bool apply_mark;
+        IRuntimeFilter* runtime_filter;
+    };
+
+    RuntimeState* _state;
+
+    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    // Set to true if the runtime filter is ready.
+    std::vector<bool> _runtime_filter_ready_flag;
+    doris::Mutex _rf_locks;
+    phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+    // True means all runtime filters are applied to scanners
+    bool _is_all_rf_applied = true;
+    bool _blocked_by_rf = false;
+
+    RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 4c55150e53..852998c25c 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -94,8 +94,7 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
 }
 
 Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::init(tnode, state));
-    _state = state;
+    RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
     _is_pipeline_scan = state->enable_pipeline_exec();
 
     const TQueryOptions& query_options = state->query_options();
@@ -109,9 +108,6 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     } else {
         _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
     }
-
-    RETURN_IF_ERROR(_register_runtime_filter());
-
     return Status::OK();
 }
 
@@ -149,7 +145,8 @@ Status VScanNode::prepare(RuntimeState* state) {
     // if you want to add some profile in scan node, even it have not new VScanner object
     // could add here, not in the _init_profile() function
     _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime");
-    _acquire_runtime_filter_timer = ADD_TIMER(_runtime_profile, "AcuireRuntimeFilterTime");
+
+    _prepare_rf_timer(_runtime_profile.get());
 
     _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
     _alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime");
@@ -311,95 +308,6 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners) {
     return Status::OK();
 }
 
-Status VScanNode::_register_runtime_filter() {
-    int filter_size = _runtime_filter_descs.size();
-    _runtime_filter_ctxs.reserve(filter_size);
-    _runtime_filter_ready_flag.reserve(filter_size);
-    for (int i = 0; i < filter_size; ++i) {
-        IRuntimeFilter* runtime_filter = nullptr;
-        const auto& filter_desc = _runtime_filter_descs[i];
-        if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets);
-            // Optimize merging phase iff:
-            // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
-            // 2. This filter is bloom filter (only bloom filter should be used for merging)
-            RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
-                    RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
-                    false));
-            RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, &runtime_filter));
-        } else {
-            RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
-                    RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(),
-                    false));
-            RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-                                                                             &runtime_filter));
-        }
-        _runtime_filter_ctxs.emplace_back(runtime_filter);
-        _runtime_filter_ready_flag.emplace_back(false);
-    }
-    return Status::OK();
-}
-
-bool VScanNode::runtime_filters_are_ready_or_timeout() {
-    if (!_blocked_by_rf) {
-        return true;
-    }
-    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        if (!runtime_filter->is_ready_or_timeout()) {
-            return false;
-        }
-    }
-    _blocked_by_rf = false;
-    return true;
-}
-
-Status VScanNode::_acquire_runtime_filter(bool wait) {
-    SCOPED_TIMER(_acquire_runtime_filter_timer);
-    VExprSPtrs vexprs;
-    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready && wait) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
-                   runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            _blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
-        }
-    }
-    RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
-    if (_blocked_by_rf) {
-        return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
-    }
-
-    return Status::OK();
-}
-
-Status VScanNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
-    if (vexprs.empty()) {
-        return Status::OK();
-    }
-
-    for (auto& expr : vexprs) {
-        VExprContextSPtr conjunct = VExprContext::create_shared(expr);
-        RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
-        RETURN_IF_ERROR(conjunct->open(_state));
-        _rf_vexpr_set.insert(expr);
-        _conjuncts.emplace_back(conjunct);
-    }
-
-    return Status::OK();
-}
-
 Status VScanNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
@@ -1280,46 +1188,6 @@ Status VScanNode::_change_value_range(ColumnValueRange<PrimitiveType>& temp_rang
     return Status::OK();
 }
 
-Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
-    if (_is_all_rf_applied) {
-        *arrived_rf_num = _runtime_filter_descs.size();
-        return Status::OK();
-    }
-
-    // This method will be called in scanner thread.
-    // So need to add lock
-    std::unique_lock l(_rf_locks);
-    if (_is_all_rf_applied) {
-        *arrived_rf_num = _runtime_filter_descs.size();
-        return Status::OK();
-    }
-
-    // 1. Check if are runtime filter ready but not applied.
-    VExprSPtrs exprs;
-    int current_arrived_rf_num = 0;
-    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        if (_runtime_filter_ctxs[i].apply_mark) {
-            ++current_arrived_rf_num;
-            continue;
-        } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-            RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
-                    &exprs, _row_descriptor, _state));
-            ++current_arrived_rf_num;
-            _runtime_filter_ctxs[i].apply_mark = true;
-        }
-    }
-    // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
-    if (!exprs.empty()) {
-        RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
-    }
-    if (current_arrived_rf_num == _runtime_filter_descs.size()) {
-        _is_all_rf_applied = true;
-    }
-
-    *arrived_rf_num = current_arrived_rf_num;
-    return Status::OK();
-}
-
 Status VScanNode::clone_conjunct_ctxs(VExprContextSPtrs& conjuncts) {
     if (!_conjuncts.empty()) {
         std::unique_lock l(_rf_locks);
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 8d235a8365..bb1089d349 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -43,6 +43,7 @@
 #include "runtime/runtime_state.h"
 #include "util/lock.h"
 #include "util/runtime_profile.h"
+#include "vec/exec/runtime_filter_consumer_node.h"
 #include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscanner.h"
 #include "vec/runtime/shared_scanner_controller.h"
@@ -87,10 +88,10 @@ struct FilterPredicates {
     std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters;
 };
 
-class VScanNode : public ExecNode {
+class VScanNode : public RuntimeFilterConsumerNode {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-            : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {
+            : RuntimeFilterConsumerNode(pool, tnode, descs) {
         if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
             // Which means the request could be fullfilled in a single segment iterator request.
             if (tnode.limit > 0 && tnode.limit < 1024) {
@@ -140,10 +141,6 @@ public:
 
     void set_no_agg_finalize() { _need_agg_finalize = false; }
 
-    // Try append late arrived runtime filters.
-    // Return num of filters which are applied already.
-    Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
-
     // Clone current _conjuncts to conjuncts, if exists.
     Status clone_conjunct_ctxs(VExprContextSPtrs& conjuncts);
 
@@ -156,7 +153,6 @@ public:
 
     Status alloc_resource(RuntimeState* state) override;
     void release_resource(RuntimeState* state) override;
-    bool runtime_filters_are_ready_or_timeout();
 
     Status try_close();
 
@@ -242,7 +238,6 @@ protected:
 
     Status _prepare_scanners();
 
-    RuntimeState* _state;
     bool _is_pipeline_scan = false;
     bool _shared_scan_opt = false;
     // For load scan node, there should be both input and output tuple descriptor.
@@ -256,24 +251,6 @@ protected:
     int _max_scan_key_num;
     int _max_pushdown_conditions_per_column;
 
-    // For runtime filters
-    struct RuntimeFilterContext {
-        RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
-        RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
-        // set to true if this runtime filter is already applied to vconjunct_ctx_ptr
-        bool apply_mark;
-        IRuntimeFilter* runtime_filter;
-    };
-    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    // Set to true if the runtime filter is ready.
-    std::vector<bool> _runtime_filter_ready_flag;
-    doris::Mutex _rf_locks;
-    phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
-    // True means all runtime filters are applied to scanners
-    bool _is_all_rf_applied = true;
-
     // Each scan node will generates a ScannerContext to manage all Scanners.
     // See comments of ScannerContext for more details
     std::shared_ptr<ScannerContext> _scanner_ctx;
@@ -315,7 +292,6 @@ protected:
     std::vector<ColumnValueRangeType> _not_in_value_ranges;
 
     bool _need_agg_finalize = true;
-    bool _blocked_by_rf = false;
     // If the query like select * from table limit 10; then the query should run in
     // single scanner to avoid too many scanners which will cause lots of useless read.
     bool _should_run_serial = false;
@@ -371,13 +347,6 @@ protected:
     std::vector<int> _col_distribute_ids;
 
 private:
-    // Register and get all runtime filters at Init phase.
-    Status _register_runtime_filter();
-    // Get all arrived runtime filters at Open phase.
-    Status _acquire_runtime_filter(bool wait = true);
-    // Append late-arrival runtime filters to the vconjunct_ctx.
-    Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
-
     Status _normalize_conjuncts();
     Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context,
                                 VExprSPtr& output_expr);
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 35494e93d0..95ce6cbc8a 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -82,15 +82,15 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* state) {
     for (const auto& filter_desc : _runtime_filter_descs) {
         IRuntimeFilter* runtime_filter = nullptr;
         if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-            RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_filter(
-                    RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false));
+            RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, state->query_options(), id(), false));
             RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
-                    filter_desc.filter_id, &runtime_filter));
+                    filter_desc.filter_id, id(), &runtime_filter));
         } else {
-            RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
-                    RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id(), false));
+            RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
+                    filter_desc, state->query_options(), id(), false));
             RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-                                                                            &runtime_filter));
+                                                                            id(), &runtime_filter));
         }
         runtime_filter->init_profile(_runtime_profile.get());
     }
diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp
index ee1628cd19..626fd5ce96 100644
--- a/be/src/vec/exec/vselect_node.cpp
+++ b/be/src/vec/exec/vselect_node.cpp
@@ -37,22 +37,34 @@ class TPlanNode;
 namespace vectorized {
 
 VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : ExecNode(pool, tnode, descs), _child_eos(false) {}
+        : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {}
 
 Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    return ExecNode::init(tnode, state);
+    return RuntimeFilterConsumerNode::init(tnode, state);
 }
 
 Status VSelectNode::prepare(RuntimeState* state) {
-    return ExecNode::prepare(state);
+    return RuntimeFilterConsumerNode::prepare(state);
 }
 
 Status VSelectNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state));
     RETURN_IF_ERROR(child(0)->open(state));
     return Status::OK();
 }
 
+Status VSelectNode::alloc_resource(RuntimeState* state) {
+    if (_opened) {
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state));
+    RETURN_IF_ERROR(_acquire_runtime_filter());
+    RETURN_IF_CANCELLED(state);
+    _opened = true;
+    return Status::OK();
+}
+
 Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h
index d6783d7237..140009e4b3 100644
--- a/be/src/vec/exec/vselect_node.h
+++ b/be/src/vec/exec/vselect_node.h
@@ -17,7 +17,7 @@
 
 #pragma once
 #include "common/status.h"
-#include "exec/exec_node.h"
+#include "vec/exec/runtime_filter_consumer_node.h"
 
 namespace doris {
 class DescriptorTbl;
@@ -28,7 +28,7 @@ class TPlanNode;
 namespace vectorized {
 class Block;
 
-class VSelectNode final : public ExecNode {
+class VSelectNode final : public RuntimeFilterConsumerNode {
 public:
     VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
@@ -38,9 +38,12 @@ public:
     Status close(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
 
+    Status alloc_resource(RuntimeState* state) override;
+
 private:
     // true if last get_next() call on child signalled eos
     bool _child_eos;
+    bool _opened = false;
 };
 } // namespace vectorized
 } // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 8593b750ba..10cb05acba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1441,7 +1441,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
             }
             // translate runtime filter
             context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> {
-                List<RuntimeFilter> filters = runtimeFilterTranslator
+                Set<RuntimeFilter> filters = runtimeFilterTranslator
                         .getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
                 filters.forEach(filter -> runtimeFilterTranslator
                         .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 9df4e74ea4..da4bb2e131 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleId;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
 import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.ObjectId;
@@ -41,9 +42,11 @@ import org.apache.doris.thrift.TRuntimeFilterType;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * translate runtime filter
@@ -57,7 +60,7 @@ public class RuntimeFilterTranslator {
         context.generatePhysicalHashJoinToRuntimeFilter();
     }
 
-    public List<RuntimeFilter> getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
+    public Set<RuntimeFilter> getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
         return context.getRuntimeFilterOnHashJoinNode(join);
     }
 
@@ -101,51 +104,65 @@ public class RuntimeFilterTranslator {
      * @param ctx plan translator context
      */
     public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, PlanTranslatorContext ctx) {
-        Expr target = context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId());
-        if (target == null) {
-            context.setTargetNullCount();
-            return;
-        }
-        Expr targetExpr;
-        if (filter.getType() == TRuntimeFilterType.BITMAP) {
-            if (filter.getTargetExpression().equals(filter.getTargetExpr())) {
-                targetExpr = target;
+        Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
+        List<Expr> targetExprList = new ArrayList<>();
+        List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new ArrayList<>();
+        List<ScanNode> scanNodeList = new ArrayList<>();
+        boolean hasInvalidTarget = false;
+        for (int i = 0; i < filter.getTargetExprs().size(); i++) {
+            Slot curTargetExpr = filter.getTargetExprs().get(i);
+            Expression curTargetExpression = filter.getTargetExpressions().get(i);
+            Expr target = context.getExprIdToOlapScanNodeSlotRef().get(curTargetExpr.getExprId());
+            if (target == null) {
+                context.setTargetNullCount();
+                hasInvalidTarget = true;
+                break;
+            }
+            Expr targetExpr;
+            if (filter.getType() == TRuntimeFilterType.BITMAP) {
+                if (curTargetExpression.equals(curTargetExpr)) {
+                    targetExpr = target;
+                } else {
+                    RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
+                            context.getExprIdToOlapScanNodeSlotRef());
+                    targetExpr = curTargetExpression.accept(translator, ctx);
+                }
             } else {
-                RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator(
-                        context.getExprIdToOlapScanNodeSlotRef());
-                targetExpr = filter.getTargetExpression().accept(translator, ctx);
+                targetExpr = target;
             }
-        } else {
-            targetExpr = target;
-        }
-
-        Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
-        SlotRef targetSlot = target.getSrcSlotRef();
-        TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
-        SlotId targetSlotId = targetSlot.getSlotId();
-        // adjust data type
-        if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
-            targetExpr = new CastExpr(src.getType(), targetExpr);
+            // adjust data type
+            if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) {
+                targetExpr = new CastExpr(src.getType(), targetExpr);
+            }
+            SlotRef targetSlot = target.getSrcSlotRef();
+            TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
+            SlotId targetSlotId = targetSlot.getSlotId();
+            ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr);
+            scanNodeList.add(scanNode);
+            targetExprList.add(targetExpr);
+            targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)));
         }
-        org.apache.doris.planner.RuntimeFilter origFilter
-                = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
-                filter.getId(), node, src, filter.getExprOrder(), targetExpr,
-                ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)),
-                filter.getType(), context.getLimits(), filter.getBuildSideNdv());
-        if (node instanceof HashJoinNode) {
-            origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
-        } else {
-            //bitmap rf requires isBroadCast=false, it always requires merge filter
-            origFilter.setIsBroadcast(false);
+        if (!hasInvalidTarget) {
+            org.apache.doris.planner.RuntimeFilter origFilter
+                    = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                    filter.getId(), node, src, filter.getExprOrder(), targetExprList,
+                    targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv());
+            if (node instanceof HashJoinNode) {
+                origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
+            } else {
+                //bitmap rf requires isBroadCast=false, it always requires merge filter
+                origFilter.setIsBroadcast(false);
+            }
+            boolean isLocalTarget = scanNodeList.stream().allMatch(e -> e.getFragmentId().equals(node.getFragmentId()));
+            for (int i = 0; i < targetExprList.size(); i++) {
+                ScanNode scanNode = scanNodeList.get(i);
+                Expr targetExpr = targetExprList.get(i);
+                origFilter.addTarget(new RuntimeFilterTarget(
+                        scanNode, targetExpr, true, isLocalTarget));
+            }
+            origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
+            context.getLegacyFilters().add(finalize(origFilter));
         }
-        ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr());
-        origFilter.addTarget(new RuntimeFilterTarget(
-                scanNode,
-                targetExpr,
-                true,
-                scanNode.getFragmentId().equals(node.getFragmentId())));
-        origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
-        context.getLegacyFilters().add(finalize(origFilter));
     }
 
     private org.apache.doris.planner.RuntimeFilter finalize(org.apache.doris.planner.RuntimeFilter origFilter) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 69986cde67..b9d2ac301e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -67,7 +67,7 @@ public class RuntimeFilterContext {
     // exprId to olap scan node slotRef because the slotRef will be changed when translating.
     private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = Maps.newHashMap();
 
-    private final Map<AbstractPhysicalJoin, List<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap();
+    private final Map<AbstractPhysicalJoin, Set<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap();
 
     // alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way
     // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A.
@@ -97,7 +97,7 @@ public class RuntimeFilterContext {
     }
 
     public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
-        Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
+        Preconditions.checkArgument(filter.getTargetExprs().stream().anyMatch(expr -> expr.getExprId() == id));
         this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter);
     }
 
@@ -135,13 +135,13 @@ public class RuntimeFilterContext {
         return scanNodeOfLegacyRuntimeFilterTarget;
     }
 
-    public List<RuntimeFilter> getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
-        return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptyList());
+    public Set<RuntimeFilter> getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
+        return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptySet());
     }
 
     public void generatePhysicalHashJoinToRuntimeFilter() {
         targetExprIdToFilter.values().forEach(filters -> filters.forEach(filter -> runtimeFilterOnHashJoinNode
-                .computeIfAbsent(filter.getBuilderNode(), k -> Lists.newArrayList()).add(filter)));
+                .computeIfAbsent(filter.getBuilderNode(), k -> Sets.newHashSet()).add(filter)));
     }
 
     public Map<ExprId, List<RuntimeFilter>> getTargetExprIdToFilter() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index bb0272c3a7..9ec1a34c9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -27,15 +27,19 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
 import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -44,8 +48,10 @@ import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -105,29 +111,81 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
                     if (type == TRuntimeFilterType.BITMAP) {
                         continue;
                     }
-                    // currently, we can ensure children in the two side are corresponding to the equal_to's.
-                    // so right maybe an expression and left is a slot
-                    Slot unwrappedSlot = checkTargetChild(equalTo.left());
-                    // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
-                    // contains join with denied join type. for example: a left join b on a.id = b.id
-                    if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
-                        continue;
-                    }
-                    Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
-                    PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
-                    // in-filter is not friendly to pipeline
-                    if (type == TRuntimeFilterType.IN_OR_BLOOM
-                            && ctx.getSessionVariable().enablePipelineEngine()
-                            && hasRemoteTarget(join, scan)) {
-                        type = TRuntimeFilterType.BLOOM;
-                    }
+                    if (join.left() instanceof PhysicalUnion
+                            || join.left() instanceof PhysicalIntersect
+                            || join.left() instanceof PhysicalExcept) {
+                        List<Slot> targetList = new ArrayList<>();
+                        int projIndex = -1;
+                        for (int j = 0; j < join.left().children().size(); j++) {
+                            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+                            if (child instanceof PhysicalProject) {
+                                PhysicalProject project = (PhysicalProject) child;
+                                Slot leftSlot = checkTargetChild(equalTo.left());
+                                if (leftSlot == null) {
+                                    break;
+                                }
+                                for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
+                                    NamedExpression expr = (NamedExpression) project.getProjects().get(k);
+                                    if (expr.getName().equals(leftSlot.getName())) {
+                                        projIndex = k;
+                                        break;
+                                    }
+                                }
+                                Preconditions.checkState(projIndex >= 0
+                                        && projIndex < project.getProjects().size());
 
-                    long buildSideNdv = getBuildSideNdv(join, equalTo);
-                    RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
-                            equalTo.right(), olapScanSlot, type, i, join, buildSideNdv);
-                    ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
-                    ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
-                    ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+                                NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
+
+                                SlotReference origSlot = null;
+                                if (targetExpr instanceof Alias) {
+                                    origSlot = (SlotReference) targetExpr.child(0);
+                                } else {
+                                    origSlot = (SlotReference) targetExpr;
+                                }
+                                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                                        && ctx.getSessionVariable().enablePipelineEngine()
+                                        && hasRemoteTarget(join, scan)) {
+                                    type = TRuntimeFilterType.BLOOM;
+                                }
+                                targetList.add(olapScanSlot);
+                                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                                ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), olapScanSlot);
+                            }
+                        }
+                        if (!targetList.isEmpty()) {
+                            long buildSideNdv = getBuildSideNdv(join, equalTo);
+                            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                                    equalTo.right(), targetList, type, i, join, buildSideNdv);
+                            for (int j = 0; j < targetList.size(); j++) {
+                                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
+                            }
+                        }
+                    } else {
+                        // currently, we can ensure children in the two side are corresponding to the equal_to's.
+                        // so right maybe an expression and left is a slot
+                        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+                        // aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
+                        // contains join with denied join type. for example: a left join b on a.id = b.id
+                        if (unwrappedSlot == null || !aliasTransferMap.containsKey(unwrappedSlot)) {
+                            continue;
+                        }
+                        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+                        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+                        // in-filter is not friendly to pipeline
+                        if (type == TRuntimeFilterType.IN_OR_BLOOM
+                                && ctx.getSessionVariable().enablePipelineEngine()
+                                && hasRemoteTarget(join, scan)) {
+                            type = TRuntimeFilterType.BLOOM;
+                        }
+                        long buildSideNdv = getBuildSideNdv(join, equalTo);
+                        RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                                equalTo.right(), ImmutableList.of(olapScanSlot), type, i, join, buildSideNdv);
+                        ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                        ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+                        ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), olapScanSlot);
+                    }
                 }
             }
         }
@@ -194,8 +252,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
                 if (targetSlot != null && aliasTransferMap.containsKey(targetSlot)) {
                     Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
                     RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
-                            bitmapContains.child(0), olapScanSlot,
-                            bitmapContains.child(1), type, i, join, isNot, -1L);
+                            bitmapContains.child(0), ImmutableList.of(olapScanSlot),
+                            ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L);
                     ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
                     ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
                     ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
@@ -221,7 +279,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
             if (expr instanceof NamedExpression && aliasTransferMap.containsKey((NamedExpression) expr)) {
                 if (expression instanceof Alias) {
                     Alias alias = ((Alias) expression);
-                    aliasTransferMap.put(alias.toSlot(), aliasTransferMap.remove(expr));
+                    aliasTransferMap.put(alias.toSlot(), aliasTransferMap.get(expr));
                 }
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 703a043b48..ff907f62aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -22,6 +22,10 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
 /**
  * runtime filter
  */
@@ -32,8 +36,8 @@ public class RuntimeFilter {
     private final Expression srcSlot;
     //bitmap filter support target expression like  k1+1, abs(k1)
     //targetExpression is an expression on targetSlot, in which there is only one non-const slot
-    private final Expression targetExpression;
-    private final Slot targetSlot;
+    private final List<Expression> targetExpressions;
+    private final List<Slot> targetSlots;
     private final int exprOrder;
     private final AbstractPhysicalJoin builderNode;
 
@@ -44,21 +48,21 @@ public class RuntimeFilter {
     /**
      * constructor
      */
-    public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, TRuntimeFilterType type,
+    public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type,
             int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) {
-        this(id, src, target, target, type, exprOrder, builderNode, false, buildSideNdv);
+        this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, builderNode, false, buildSideNdv);
     }
 
     /**
      * constructor
      */
-    public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, Expression targetExpression,
+    public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions,
             TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn,
             long buildSideNdv) {
         this.id = id;
         this.srcSlot = src;
-        this.targetSlot = target;
-        this.targetExpression = targetExpression;
+        this.targetSlots = ImmutableList.copyOf(targets);
+        this.targetExpressions = ImmutableList.copyOf(targetExpressions);
         this.type = type;
         this.exprOrder = exprOrder;
         this.builderNode = builderNode;
@@ -71,8 +75,8 @@ public class RuntimeFilter {
         return srcSlot;
     }
 
-    public Slot getTargetExpr() {
-        return targetSlot;
+    public List<Slot> getTargetExprs() {
+        return targetSlots;
     }
 
     public RuntimeFilterId getId() {
@@ -95,8 +99,8 @@ public class RuntimeFilter {
         return bitmapFilterNotIn;
     }
 
-    public Expression getTargetExpression() {
-        return targetExpression;
+    public List<Expression> getTargetExpressions() {
+        return targetExpressions;
     }
 
     public long getBuildSideNdv() {
@@ -107,7 +111,7 @@ public class RuntimeFilter {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("RF").append(id.asInt())
-                .append("[").append(getSrcExpr()).append("->").append(targetSlot)
+                .append("[").append(getSrcExpr()).append("->").append(targetSlots)
                 .append("(ndv/size = ").append(buildSideNdv).append("/")
                 .append(org.apache.doris.planner.RuntimeFilter.expectRuntimeFilterSize(buildSideNdv))
                 .append(")");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 484a666851..d45f4f2611 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TRuntimeFilterType;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -71,12 +72,12 @@ public final class RuntimeFilter {
     // The position of expr in the join condition
     private final int exprOrder;
     // Expr (lhs of join predicate) from which the targetExprs_ are generated.
-    private final Expr origTargetExpr;
+    private final List<Expr> origTargetExprs;
     // Runtime filter targets
     private final List<RuntimeFilterTarget> targets = new ArrayList<>();
     // Slots from base table tuples that have value transfer from the slots
     // of 'origTargetExpr'. The slots are grouped by tuple id.
-    private final Map<TupleId, List<SlotId>> targetSlotsByTid;
+    private final List<Map<TupleId, List<SlotId>>> targetSlotsByTid;
     // If true, the join node building this filter is executed using a broadcast join;
     // set in the DistributedPlanner.createHashJoinFragment()
     private boolean isBroadcastJoin;
@@ -139,14 +140,14 @@ public final class RuntimeFilter {
     }
 
     private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder,
-            Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots, TRuntimeFilterType type,
+            List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots, TRuntimeFilterType type,
             RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
         this.id = filterId;
         this.builderNode = filterSrcNode;
         this.srcExpr = srcExpr;
         this.exprOrder = exprOrder;
-        this.origTargetExpr = origTargetExpr;
-        this.targetSlotsByTid = targetSlots;
+        this.origTargetExprs = ImmutableList.copyOf(origTargetExprs);
+        this.targetSlotsByTid = ImmutableList.copyOf(targetSlots);
         this.runtimeFilterType = type;
         this.ndvEstimate = buildSizeNdv;
         computeNdvEstimate();
@@ -155,9 +156,9 @@ public final class RuntimeFilter {
 
     // only for nereids planner
     public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr,
-            int exprOrder, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots,
+            int exprOrder, List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> targetSlots,
             TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) {
-        return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr,
+        return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs,
                 targetSlots, type, filterSizeLimits, buildSizeNdv);
     }
 
@@ -237,11 +238,11 @@ public final class RuntimeFilter {
         return srcExpr;
     }
 
-    public Expr getOrigTargetExpr() {
-        return origTargetExpr;
+    public List<Expr> getOrigTargetExprs() {
+        return origTargetExprs;
     }
 
-    public Map<TupleId, List<SlotId>> getTargetSlots() {
+    public List<Map<TupleId, List<SlotId>>> getTargetSlots() {
         return targetSlotsByTid;
     }
 
@@ -329,7 +330,7 @@ public final class RuntimeFilter {
         }
 
         return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
-                targetExpr, targetSlots, type, filterSizeLimits, -1L);
+                ImmutableList.of(targetExpr), ImmutableList.of(targetSlots), type, filterSizeLimits, -1L);
     }
 
     public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer, Expr joinPredicate,
@@ -365,7 +366,8 @@ public final class RuntimeFilter {
             }
 
             RuntimeFilter runtimeFilter =
-                    new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder, targetExpr, targetSlots,
+                    new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
+                            ImmutableList.of(targetExpr), ImmutableList.of(targetSlots),
                             type, filterSizeLimits, -1L);
             runtimeFilter.setBitmapFilterNotIn(((BitmapFilterPredicate) joinPredicate).isNotIn());
             return runtimeFilter;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index 7de864f50a..fb1b4da92b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -317,10 +317,12 @@ public final class RuntimeFilterGenerator {
      * destination node for that filter.
      */
     private void registerRuntimeFilter(RuntimeFilter filter) {
-        Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
-        Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
-        for (TupleId tupleId : targetSlotsByTid.keySet()) {
-            registerRuntimeFilter(filter, tupleId);
+        List<Map<TupleId, List<SlotId>>> targetSlotsByTids = filter.getTargetSlots();
+        for (Map<TupleId, List<SlotId>> targetSlotsByTid : targetSlotsByTids) {
+            Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
+            for (TupleId tupleId : targetSlotsByTid.keySet()) {
+                registerRuntimeFilter(filter, tupleId);
+            }
         }
     }
 
@@ -328,7 +330,7 @@ public final class RuntimeFilterGenerator {
      * Registers a runtime filter with a specific target tuple id.
      */
     private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
-        Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+        Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid)));
         List<RuntimeFilter> filters = runtimeFiltersByTid.computeIfAbsent(targetTid, k -> new ArrayList<>());
         Preconditions.checkState(!filter.isFinalized());
         filters.add(filter);
@@ -344,9 +346,11 @@ public final class RuntimeFilterGenerator {
         for (RuntimeFilter.RuntimeFilterTarget target : runtimeFilter.getTargets()) {
             targetTupleIds.addAll(target.node.getTupleIds());
         }
-        for (TupleId tupleId : runtimeFilter.getTargetSlots().keySet()) {
-            if (!targetTupleIds.contains(tupleId)) {
-                runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+        for (Map<TupleId, List<SlotId>> slots : runtimeFilter.getTargetSlots()) {
+            for (TupleId tupleId : slots.keySet()) {
+                if (!targetTupleIds.contains(tupleId)) {
+                    runtimeFiltersByTid.get(tupleId).remove(runtimeFilter);
+                }
             }
         }
         runtimeFilter.markFinalized();
@@ -424,14 +428,22 @@ public final class RuntimeFilterGenerator {
      * the scan node with target tuple descriptor 'targetTid'.
      */
     private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid) {
-        Expr targetExpr = filter.getOrigTargetExpr();
+        Preconditions.checkState(filter.getTargetSlots().size() == filter.getOrigTargetExprs().size());
+        Expr targetExpr = null;
+        for (int i = 0; i < filter.getOrigTargetExprs().size(); i++) {
+            if (filter.getTargetSlots().get(i).containsKey(targetTid)) {
+                targetExpr = filter.getOrigTargetExprs().get(i);
+                break;
+            }
+        }
+        Preconditions.checkState(targetExpr != null);
         // if there is a subquery on the left side of join, in order to push to scan in the subquery,
         // targetExpr will return false as long as there is a slotref parent node that is not targetTid.
         // But when this slotref can be transferred to the targetTid slot, such as Aa + Bb = Cc,
         // targetTid is B, if Aa can be transferred to Ba, that is, Aa and Ba are equivalent columns,
         // then replace Aa with Ba, and then calculate for targetTid targetExpr
         if (!targetExpr.isBound(targetTid)) {
-            Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
+            Preconditions.checkState(filter.getTargetSlots().stream().anyMatch(e -> e.containsKey(targetTid)));
             // Modify the filter target expr using the equivalent slots from the scan node
             // on which the filter will be applied.
             ExprSubstitutionMap smap = new ExprSubstitutionMap();
@@ -440,7 +452,14 @@ public final class RuntimeFilterGenerator {
             // all slots of targetSlotsByTid.
             targetExpr.collect(SlotRef.class, exprSlots);
             // targetExpr specifies the id of the slotRef node in the `tupleID`
-            List<SlotId> sids = filter.getTargetSlots().get(targetTid);
+            List<SlotId> sids = new ArrayList<>();
+            for (Map<TupleId, List<SlotId>> map : filter.getTargetSlots()) {
+                if (map.containsKey(targetTid)) {
+                    sids = map.get(targetTid);
+                    break;
+                }
+            }
+            Preconditions.checkState(!sids.isEmpty());
             for (SlotRef slotRef : exprSlots) {
                 for (SlotId sid : sids) {
                     if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index e4dd754be9..699df19ba4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -252,7 +252,7 @@ public class RuntimeFilterTest extends SSBTestBase {
         for (RuntimeFilter filter : filters) {
             Assertions.assertTrue(colNames.contains(Pair.of(
                     filter.getSrcExpr().toSql(),
-                    filter.getTargetExpr().getName())));
+                    filter.getTargetExprs().get(0).getName())));
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org