You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/20 01:46:44 UTC

[doris] branch master updated: [feature] support runtime filter on vectorized engine (#10103)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 588634ddf6 [feature] support runtime filter on vectorized engine (#10103)
588634ddf6 is described below

commit 588634ddf6d1c6260119dadbf426a52213caca7d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Mon Jun 20 09:46:38 2022 +0800

    [feature] support runtime filter on vectorized engine (#10103)
---
 be/src/exprs/expr.h                         |  82 ++++++
 be/src/exprs/hybrid_set.h                   |  37 +++
 be/src/exprs/literal.h                      |   6 +-
 be/src/exprs/runtime_filter.cpp             | 388 +++++++++++++++++++---------
 be/src/exprs/runtime_filter.h               |  11 +
 be/src/runtime/types.cpp                    |  14 +
 be/src/runtime/types.h                      |   2 +
 be/src/util/simd/bits.h                     |  20 ++
 be/src/vec/CMakeLists.txt                   |   2 +
 be/src/vec/exec/volap_scan_node.cpp         |  64 ++++-
 be/src/vec/exec/volap_scan_node.h           |   2 +
 be/src/vec/exprs/vbloom_predicate.cpp       |  96 +++++++
 be/src/vec/exprs/vbloom_predicate.h         |  48 ++++
 be/src/vec/exprs/vexpr.cpp                  |  10 +
 be/src/vec/exprs/vexpr.h                    |   1 +
 be/src/vec/exprs/vruntimefilter_wrapper.cpp | 111 ++++++++
 be/src/vec/exprs/vruntimefilter_wrapper.h   |  63 +++++
 17 files changed, 822 insertions(+), 135 deletions(-)

diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 4e95c53237..e4cb02ea1b 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -28,6 +28,10 @@
 #include "exprs/expr_value.h"
 #include "gen_cpp/Opcodes_types.h"
 #include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/string_value.h"
+#include "runtime/string_value.hpp"
+#include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
 #include "udf/udf.h"
 
@@ -465,4 +469,82 @@ inline bool Expr::evaluate(VectorizedRowBatch* batch) {
     }
 }
 
+template <typename T>
+Status create_texpr_literal_node(const void* data, TExprNode* node) {
+    if constexpr (std::is_same_v<bool, T>) {
+        auto origin_value = reinterpret_cast<const T*>(data);
+        TBoolLiteral boolLiteral;
+        (*node).__set_node_type(TExprNodeType::BOOL_LITERAL);
+        boolLiteral.__set_value(*origin_value);
+        (*node).__set_bool_literal(boolLiteral);
+        (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+    } else if constexpr (std::is_same_v<int8_t, T> || std::is_same_v<int16_t, T> ||
+                         std::is_same_v<int32_t, T> || std::is_same_v<int64_t, T>) {
+        auto origin_value = reinterpret_cast<const T*>(data);
+        (*node).__set_node_type(TExprNodeType::INT_LITERAL);
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*origin_value);
+        (*node).__set_int_literal(intLiteral);
+        if constexpr (std::is_same_v<int8_t, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT));
+        } else if constexpr (std::is_same_v<int16_t, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT));
+        } else if constexpr (std::is_same_v<int32_t, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT));
+        } else if constexpr (std::is_same_v<int64_t, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT));
+        }
+    } else if constexpr (std::is_same_v<__int128_t, T>) {
+        auto origin_value = reinterpret_cast<const T*>(data);
+        (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL);
+        TLargeIntLiteral large_int_literal;
+        large_int_literal.__set_value(LargeIntValue::to_string(*origin_value));
+        (*node).__set_large_int_literal(large_int_literal);
+        (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT));
+    } else if constexpr (std::is_same_v<DateTimeValue, T>) {
+        auto origin_value = reinterpret_cast<const doris::DateTimeValue*>(data);
+        TDateLiteral date_literal;
+        char convert_buffer[30];
+        origin_value->to_string(convert_buffer);
+        date_literal.__set_value(convert_buffer);
+        (*node).__set_date_literal(date_literal);
+        (*node).__set_node_type(TExprNodeType::DATE_LITERAL);
+        if (origin_value->type() == TimeType::TIME_DATE) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATE));
+        } else if (origin_value->type() == TimeType::TIME_DATETIME) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIME));
+        } else if (origin_value->type() == TimeType::TIME_TIME) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME));
+        }
+    } else if constexpr (std::is_same_v<DecimalV2Value, T>) {
+        auto origin_value = reinterpret_cast<const DecimalV2Value*>(data);
+        (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
+        TDecimalLiteral decimal_literal;
+        decimal_literal.__set_value(origin_value->to_string());
+        (*node).__set_decimal_literal(decimal_literal);
+        (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2));
+    } else if constexpr (std::is_same_v<float, T> || std::is_same_v<double, T>) {
+        auto origin_value = reinterpret_cast<const T*>(data);
+        (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
+        TFloatLiteral float_literal;
+        float_literal.__set_value(*origin_value);
+        (*node).__set_float_literal(float_literal);
+        if constexpr (std::is_same_v<float, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT));
+        } else if constexpr (std::is_same_v<double, T>) {
+            (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE));
+        }
+    } else if constexpr (std::is_same_v<StringValue, T>) {
+        auto origin_value = reinterpret_cast<const StringValue*>(data);
+        (*node).__set_node_type(TExprNodeType::STRING_LITERAL);
+        TStringLiteral string_literal;
+        string_literal.__set_value(origin_value->to_string());
+        (*node).__set_string_literal(string_literal);
+        (*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING));
+    } else {
+        return Status::InvalidArgument("Invalid argument type!");
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 578c8d2e96..81f71d45f2 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -22,8 +22,14 @@
 #include <cstring>
 
 #include "common/object_pool.h"
+#include "common/status.h"
+#include "exprs/expr.h"
+#include "runtime/datetime_value.h"
 #include "runtime/decimalv2_value.h"
+#include "runtime/large_int_value.h"
+#include "runtime/primitive_type.h"
 #include "runtime/string_value.h"
+#include "vec/exprs/vliteral.h"
 
 namespace doris {
 
@@ -41,6 +47,9 @@ public:
     virtual bool find(void* data) = 0;
     // use in vectorize execute engine
     virtual bool find(void* data, size_t) = 0;
+
+    virtual Status to_vexpr_list(doris::ObjectPool* pool,
+                                 std::vector<doris::vectorized::VExpr*>* vexpr_list) = 0;
     class IteratorBase {
     public:
         IteratorBase() {}
@@ -60,6 +69,20 @@ public:
 
     ~HybridSet() override = default;
 
+    virtual Status to_vexpr_list(doris::ObjectPool* pool,
+                                 std::vector<doris::vectorized::VExpr*>* vexpr_list) override {
+        HybridSetBase::IteratorBase* it = begin();
+        DCHECK(it != nullptr);
+        while (it->has_next()) {
+            TExprNode node;
+            const void* v = it->get_value();
+            create_texpr_literal_node<T>(v, &node);
+            vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node)));
+            it->next();
+        }
+        return Status::OK();
+    };
+
     void insert(const void* data) override {
         if (data == nullptr) return;
 
@@ -119,6 +142,20 @@ public:
 
     ~StringValueSet() override = default;
 
+    virtual Status to_vexpr_list(doris::ObjectPool* pool,
+                                 std::vector<doris::vectorized::VExpr*>* vexpr_list) override {
+        HybridSetBase::IteratorBase* it = begin();
+        DCHECK(it != nullptr);
+        while (it->has_next()) {
+            TExprNode node;
+            const void* v = it->get_value();
+            create_texpr_literal_node<StringValue>(v, &node);
+            vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node)));
+            it->next();
+        }
+        return Status::OK();
+    };
+
     void insert(const void* data) override {
         if (data == nullptr) return;
 
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index ebccdd3b9e..fc77e06ad7 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -29,6 +29,7 @@ class TExprNode;
 
 class Literal final : public Expr {
 public:
+    Literal(const TExprNode& node);
     virtual ~Literal();
 
     virtual Expr* clone(ObjectPool* pool) const override { return pool->add(new Literal(*this)); }
@@ -49,11 +50,6 @@ public:
     virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
                            ExprContext* context) override;
 
-protected:
-    friend class Expr;
-    friend Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data);
-    Literal(const TExprNode& node);
-
 private:
     ExprValue _value;
 };
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1c25eaa152..2533c8c2d1 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -37,6 +37,9 @@
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/string_parser.hpp"
+#include "vec/exprs/vbloom_predicate.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
 
 namespace doris {
 // PrimitiveType->TExprNodeType
@@ -193,105 +196,71 @@ PFilterType get_type(RuntimeFilterType type) {
     }
 }
 
-TTypeDesc create_type_desc(PrimitiveType type) {
-    TTypeDesc type_desc;
-    std::vector<TTypeNode> node_type;
-    node_type.emplace_back();
-    TScalarType scalarType;
-    scalarType.__set_type(to_thrift(type));
-    scalarType.__set_len(-1);
-    scalarType.__set_precision(-1);
-    scalarType.__set_scale(-1);
-    node_type.back().__set_scalar_type(scalarType);
-    type_desc.__set_types(node_type);
-    return type_desc;
-}
-
-// only used to push down to olap engine
-Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
+template <bool is_vectorized = false>
+Status create_literal(ObjectPool* pool, PrimitiveType type, const void* data, void** expr) {
     TExprNode node;
 
     switch (type) {
     case TYPE_BOOLEAN: {
-        TBoolLiteral boolLiteral;
-        boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
-        node.__set_bool_literal(boolLiteral);
+        create_texpr_literal_node<bool>(data, &node);
         break;
     }
     case TYPE_TINYINT: {
-        TIntLiteral intLiteral;
-        intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
-        node.__set_int_literal(intLiteral);
+        create_texpr_literal_node<int8_t>(data, &node);
         break;
     }
     case TYPE_SMALLINT: {
-        TIntLiteral intLiteral;
-        intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
-        node.__set_int_literal(intLiteral);
+        create_texpr_literal_node<int16_t>(data, &node);
         break;
     }
     case TYPE_INT: {
-        TIntLiteral intLiteral;
-        intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
-        node.__set_int_literal(intLiteral);
+        create_texpr_literal_node<int32_t>(data, &node);
         break;
     }
     case TYPE_BIGINT: {
-        TIntLiteral intLiteral;
-        intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
-        node.__set_int_literal(intLiteral);
+        create_texpr_literal_node<int64_t>(data, &node);
         break;
     }
     case TYPE_LARGEINT: {
-        TLargeIntLiteral largeIntLiteral;
-        largeIntLiteral.__set_value(
-                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
-        node.__set_large_int_literal(largeIntLiteral);
+        create_texpr_literal_node<int128_t>(data, &node);
         break;
     }
     case TYPE_FLOAT: {
-        TFloatLiteral floatLiteral;
-        floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
-        node.__set_float_literal(floatLiteral);
+        create_texpr_literal_node<float_t>(data, &node);
         break;
     }
     case TYPE_DOUBLE: {
-        TFloatLiteral floatLiteral;
-        floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
-        node.__set_float_literal(floatLiteral);
+        create_texpr_literal_node<double_t>(data, &node);
         break;
     }
     case TYPE_DATE:
     case TYPE_DATETIME: {
-        TDateLiteral dateLiteral;
-        char convert_buffer[30];
-        reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
-        dateLiteral.__set_value(convert_buffer);
-        node.__set_date_literal(dateLiteral);
+        create_texpr_literal_node<DateTimeValue>(data, &node);
         break;
     }
     case TYPE_DECIMALV2: {
-        TDecimalLiteral decimalLiteral;
-        decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
-        node.__set_decimal_literal(decimalLiteral);
+        create_texpr_literal_node<DecimalV2Value>(data, &node);
         break;
     }
     case TYPE_CHAR:
     case TYPE_VARCHAR:
     case TYPE_STRING: {
-        const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
-        TStringLiteral tstringLiteral;
-        tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
-        node.__set_string_literal(tstringLiteral);
+        create_texpr_literal_node<StringValue>(data, &node);
         break;
     }
     default:
         DCHECK(false);
-        return nullptr;
+        return Status::InvalidArgument("Invalid type!");
+    }
+
+    if constexpr (is_vectorized) {
+        *reinterpret_cast<doris::vectorized::VExpr**>(expr) =
+                pool->add(new doris::vectorized::VLiteral(node));
+    } else {
+        *reinterpret_cast<Expr**>(expr) = pool->add(new Literal(node));
     }
-    node.__set_node_type(get_expr_node_type(type));
-    node.__set_type(create_type_desc(type));
-    return pool->add(new Literal(node));
+
+    return Status::OK();
 }
 
 BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
@@ -312,6 +281,64 @@ BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
     node.__set_node_type(TExprNodeType::BINARY_PRED);
     return (BinaryPredicate*)pool->add(BinaryPredicate::from_thrift(node));
 }
+
+Status create_vbin_predicate(ObjectPool* pool, PrimitiveType prim_type, TExprOpcode::type opcode,
+                             doris::vectorized::VExpr** expr, TExprNode* tnode) {
+    TExprNode node;
+    TScalarType tscalar_type;
+    tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
+    TTypeNode ttype_node;
+    ttype_node.__set_type(TTypeNodeType::SCALAR);
+    ttype_node.__set_scalar_type(tscalar_type);
+    TTypeDesc t_type_desc;
+    t_type_desc.types.push_back(ttype_node);
+    node.__set_type(t_type_desc);
+    node.__set_opcode(opcode);
+    node.__set_vector_opcode(opcode);
+    node.__set_child_type(to_thrift(prim_type));
+    node.__set_num_children(2);
+    node.__set_output_scale(-1);
+    node.__set_node_type(TExprNodeType::BINARY_PRED);
+    TFunction fn;
+    TFunctionName fn_name;
+    fn_name.__set_db_name("");
+    switch (opcode) {
+    case TExprOpcode::LE:
+        fn_name.__set_function_name("le");
+        break;
+    case TExprOpcode::GE:
+        fn_name.__set_function_name("ge");
+        break;
+    default:
+        Status::InvalidArgument(
+                strings::Substitute("Invalid opcode for max_min_runtimefilter: '$0'", opcode));
+    }
+    fn.__set_name(fn_name);
+    fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+
+    TTypeNode type_node;
+    type_node.__set_type(TTypeNodeType::SCALAR);
+    TScalarType scalar_type;
+    scalar_type.__set_type(to_thrift(prim_type));
+    type_node.__set_scalar_type(scalar_type);
+
+    std::vector<TTypeNode> type_nodes;
+    type_nodes.push_back(type_node);
+
+    TTypeDesc type_desc;
+    type_desc.__set_types(type_nodes);
+
+    std::vector<TTypeDesc> arg_types;
+    arg_types.push_back(type_desc);
+    arg_types.push_back(type_desc);
+    fn.__set_arg_types(arg_types);
+
+    fn.__set_ret_type(t_type_desc);
+    fn.__set_has_var_args(false);
+    node.__set_fn(fn);
+    *tnode = node;
+    return doris::vectorized::VExpr::create_expr(pool, node, expr);
+}
 // This class is a wrapper of runtime predicate function
 class RuntimePredicateWrapper {
 public:
@@ -447,71 +474,10 @@ public:
     }
 
     template <class T>
-    Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr) {
-        DCHECK(state != nullptr);
-        DCHECK(container != nullptr);
-        DCHECK(_pool != nullptr);
-        DCHECK(prob_expr->root()->type().type == _column_return_type ||
-               (is_string_type(prob_expr->root()->type().type) &&
-                is_string_type(_column_return_type)));
-
-        auto real_filter_type = get_real_type();
-        switch (real_filter_type) {
-        case RuntimeFilterType::IN_FILTER: {
-            if (!_is_ignored_in_filter) {
-                TTypeDesc type_desc = create_type_desc(_column_return_type);
-                TExprNode node;
-                node.__set_type(type_desc);
-                node.__set_node_type(TExprNodeType::IN_PRED);
-                node.in_predicate.__set_is_not_in(false);
-                node.__set_opcode(TExprOpcode::FILTER_IN);
-                node.__isset.vector_opcode = true;
-                node.__set_vector_opcode(to_in_opcode(_column_return_type));
-                auto in_pred = _pool->add(new InPredicate(node));
-                RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
-                in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-                ExprContext* ctx = _pool->add(new ExprContext(in_pred));
-                container->push_back(ctx);
-            }
-            break;
-        }
-        case RuntimeFilterType::MINMAX_FILTER: {
-            // create max filter
-            auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
-            auto max_literal = create_literal(_pool, _column_return_type, _minmax_func->get_max());
-            max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-            max_pred->add_child(max_literal);
-            container->push_back(_pool->add(new ExprContext(max_pred)));
-            // create min filter
-            auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
-            auto min_literal = create_literal(_pool, _column_return_type, _minmax_func->get_min());
-            min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-            min_pred->add_child(min_literal);
-            container->push_back(_pool->add(new ExprContext(min_pred)));
-            break;
-        }
-        case RuntimeFilterType::BLOOM_FILTER: {
-            // create a bloom filter
-            TTypeDesc type_desc = create_type_desc(_column_return_type);
-            TExprNode node;
-            node.__set_type(type_desc);
-            node.__set_node_type(TExprNodeType::BLOOM_PRED);
-            node.__set_opcode(TExprOpcode::RT_FILTER);
-            node.__isset.vector_opcode = true;
-            node.__set_vector_opcode(to_in_opcode(_column_return_type));
-            auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
-            RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
-            bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
-            ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
-            container->push_back(ctx);
-            break;
-        }
-        default:
-            DCHECK(false);
-            break;
-        }
-        return Status::OK();
-    }
+    Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr);
+
+    Status get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container, RuntimeState* state,
+                           doris::vectorized::VExprContext* prob_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
         bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -986,6 +952,21 @@ Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr
     return Status::OK();
 }
 
+Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* vexprs,
+                                           const RowDescriptor& desc,
+                                           const std::shared_ptr<MemTracker>& tracker) {
+    DCHECK(_is_ready);
+    DCHECK(is_consumer());
+    std::lock_guard<std::mutex> guard(_inner_mutex);
+
+    if (_push_down_vexprs.empty()) {
+        RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state, _vprobe_ctx));
+    }
+    // push expr
+    vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end());
+    return Status::OK();
+}
+
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
     SCOPED_TIMER(_await_time_cost);
@@ -1054,6 +1035,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
             return Status::InternalError("not found a node id");
         }
         RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second, &_probe_ctx));
+        RETURN_IF_ERROR(
+                doris::vectorized::VExpr::create_expr_tree(_pool, iter->second, &_vprobe_ctx));
     }
 
     _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, &params));
@@ -1407,6 +1390,167 @@ Status IRuntimeFilter::consumer_close() {
     return Status::OK();
 }
 
+template <class T>
+Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* state,
+                                                 ExprContext* prob_expr) {
+    DCHECK(state != nullptr);
+    DCHECK(container != nullptr);
+    DCHECK(_pool != nullptr);
+    DCHECK(prob_expr->root()->type().type == _column_return_type ||
+           (is_string_type(prob_expr->root()->type().type) && is_string_type(_column_return_type)));
+
+    auto real_filter_type = get_real_type();
+    switch (real_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        if (!_is_ignored_in_filter) {
+            TTypeDesc type_desc = create_type_desc(_column_return_type);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::IN_PRED);
+            node.in_predicate.__set_is_not_in(false);
+            node.__set_opcode(TExprOpcode::FILTER_IN);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(to_in_opcode(_column_return_type));
+            auto in_pred = _pool->add(new InPredicate(node));
+            RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
+            in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+            ExprContext* ctx = _pool->add(new ExprContext(in_pred));
+            container->push_back(ctx);
+        }
+        break;
+    }
+    case RuntimeFilterType::MINMAX_FILTER: {
+        // create max filter
+        Expr* max_literal = nullptr;
+        auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
+        RETURN_IF_ERROR(create_literal<false>(_pool, _column_return_type, _minmax_func->get_max(),
+                                              (void**)&max_literal));
+        max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+        max_pred->add_child(max_literal);
+        container->push_back(_pool->add(new ExprContext(max_pred)));
+        // create min filter
+        Expr* min_literal = nullptr;
+        auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
+        RETURN_IF_ERROR(create_literal<false>(_pool, _column_return_type, _minmax_func->get_min(),
+                                              (void**)&min_literal));
+        min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+        min_pred->add_child(min_literal);
+        container->push_back(_pool->add(new ExprContext(min_pred)));
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        // create a bloom filter
+        TTypeDesc type_desc = create_type_desc(_column_return_type);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::BLOOM_PRED);
+        node.__set_opcode(TExprOpcode::RT_FILTER);
+        node.__isset.vector_opcode = true;
+        node.__set_vector_opcode(to_in_opcode(_column_return_type));
+        auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
+        RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
+        bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+        ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
+        container->push_back(ctx);
+        break;
+    }
+    default:
+        DCHECK(false);
+        break;
+    }
+    return Status::OK();
+}
+
+Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container,
+                                                RuntimeState* state,
+                                                doris::vectorized::VExprContext* vprob_expr) {
+    DCHECK(state != nullptr);
+    DCHECK(container != nullptr);
+    DCHECK(_pool != nullptr);
+    DCHECK(vprob_expr->root()->type().type == _column_return_type ||
+           (is_string_type(vprob_expr->root()->type().type) &&
+            is_string_type(_column_return_type)));
+
+    auto real_filter_type = get_real_type();
+    switch (real_filter_type) {
+    case RuntimeFilterType::IN_FILTER: {
+        if (!_is_ignored_in_filter) {
+            TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::IN_PRED);
+            node.in_predicate.__set_is_not_in(false);
+            node.__set_opcode(TExprOpcode::FILTER_IN);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(to_in_opcode(_column_return_type));
+
+            // VInPredicate
+            doris::vectorized::VExpr* expr = nullptr;
+            RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr(_pool, node, &expr));
+            auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+            expr->add_child(cloned_vexpr);
+
+            auto& children = const_cast<std::vector<doris::vectorized::VExpr*>&>(expr->children());
+            _hybrid_set->to_vexpr_list(_pool, &children);
+            container->push_back(
+                    _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, expr)));
+        }
+        break;
+    }
+    case RuntimeFilterType::MINMAX_FILTER: {
+        doris::vectorized::VExpr* max_pred = nullptr;
+        // create max filter
+        TExprNode max_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(_pool, _column_return_type, TExprOpcode::LE,
+                                              &max_pred, &max_pred_node));
+        doris::vectorized::VExpr* max_literal = nullptr;
+        RETURN_IF_ERROR(create_literal<true>(_pool, _column_return_type, _minmax_func->get_max(),
+                                             (void**)&max_literal));
+        auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+        max_pred->add_child(cloned_vexpr);
+        max_pred->add_child(max_literal);
+        container->push_back(
+                _pool->add(new doris::vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred)));
+
+        // create min filter
+        doris::vectorized::VExpr* min_pred = nullptr;
+        TExprNode min_pred_node;
+        RETURN_IF_ERROR(create_vbin_predicate(_pool, _column_return_type, TExprOpcode::GE,
+                                              &min_pred, &min_pred_node));
+        doris::vectorized::VExpr* min_literal = nullptr;
+        RETURN_IF_ERROR(create_literal<true>(_pool, _column_return_type, _minmax_func->get_min(),
+                                             (void**)&min_literal));
+        cloned_vexpr = vprob_expr->root()->clone(_pool);
+        min_pred->add_child(cloned_vexpr);
+        min_pred->add_child(min_literal);
+        container->push_back(
+                _pool->add(new doris::vectorized::VRuntimeFilterWrapper(min_pred_node, min_pred)));
+        break;
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        // create a bloom filter
+        TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+        TExprNode node;
+        node.__set_type(type_desc);
+        node.__set_node_type(TExprNodeType::BLOOM_PRED);
+        node.__set_opcode(TExprOpcode::RT_FILTER);
+        node.__isset.vector_opcode = true;
+        node.__set_vector_opcode(to_in_opcode(_column_return_type));
+        auto bloom_pred = _pool->add(new doris::vectorized::VBloomPredicate(node));
+        bloom_pred->set_filter(_bloomfilter_func);
+        auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+        bloom_pred->add_child(cloned_vexpr);
+        auto wrapper = _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, bloom_pred));
+        container->push_back(wrapper);
+        break;
+    }
+    default:
+        DCHECK(false);
+        break;
+    }
+    return Status::OK();
+}
+
 RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
 RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e2c6561388..5273788894 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -44,6 +44,11 @@ class PMinMaxFilter;
 class HashJoinNode;
 class RuntimeProfile;
 
+namespace vectorized {
+class VExpr;
+class VExprContext;
+} // namespace vectorized
+
 enum class RuntimeFilterType {
     UNKNOWN_FILTER = -1,
     IN_FILTER = 0,
@@ -157,6 +162,10 @@ public:
                                 const RowDescriptor& desc,
                                 const std::shared_ptr<MemTracker>& tracker);
 
+    Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* push_vexprs,
+                               const RowDescriptor& desc,
+                               const std::shared_ptr<MemTracker>& tracker);
+
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
     bool has_remote_target() const { return _has_remote_target; }
@@ -269,6 +278,7 @@ protected:
     // it only used in consumer to generate runtime_filter expr_context
     // we don't have to prepare it or close it
     ExprContext* _probe_ctx;
+    doris::vectorized::VExprContext* _vprobe_ctx;
 
     // Indicate whether runtime filter expr has been ignored
     bool _is_ignored;
@@ -279,6 +289,7 @@ protected:
     // these context is called prepared by this,
     // consumer_close should be called before release
     std::vector<ExprContext*> _push_down_ctxs;
+    std::vector<doris::vectorized::VExpr*> _push_down_vexprs;
 
     struct rpc_context;
     std::shared_ptr<rpc_context> _rpc_context;
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 58261a216b..c53febfc40 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -201,4 +201,18 @@ std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type) {
     os << type.debug_string();
     return os;
 }
+
+TTypeDesc create_type_desc(PrimitiveType type) {
+    TTypeDesc type_desc;
+    std::vector<TTypeNode> node_type;
+    node_type.emplace_back();
+    TScalarType scalarType;
+    scalarType.__set_type(to_thrift(type));
+    scalarType.__set_len(-1);
+    scalarType.__set_precision(-1);
+    scalarType.__set_scale(-1);
+    node_type.back().__set_scalar_type(scalarType);
+    type_desc.__set_types(node_type);
+    return type_desc;
+}
 } // namespace doris
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 2987064962..5852c2fb91 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -216,4 +216,6 @@ private:
 
 std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type);
 
+TTypeDesc create_type_desc(PrimitiveType type);
+
 } // namespace doris
diff --git a/be/src/util/simd/bits.h b/be/src/util/simd/bits.h
index 3d5b5e1bf7..d15243128d 100644
--- a/be/src/util/simd/bits.h
+++ b/be/src/util/simd/bits.h
@@ -59,5 +59,25 @@ inline uint32_t bytes32_mask_to_bits32_mask(const bool* data) {
     return bytes32_mask_to_bits32_mask(reinterpret_cast<const uint8_t*>(data));
 }
 
+// compiler will make this SIMD automatically
+inline size_t count_zero_num(const int8_t* __restrict data, size_t size) {
+    size_t num = 0;
+    const int8_t* end = data + size;
+    for (; data < end; ++data) {
+        num += (*data == 0);
+    }
+    return num;
+}
+
+inline size_t count_zero_num(const int8_t* __restrict data, const uint8_t* __restrict null_map,
+                             size_t size) {
+    size_t num = 0;
+    const int8_t* end = data + size;
+    for (; data < end; ++data, ++null_map) {
+        num += ((*data == 0) | *null_map);
+    }
+    return num;
+}
+
 } // namespace simd
 } // namespace doris
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 6334a92a08..69aaf816ba 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -115,6 +115,8 @@ set(VEC_FILES
   exprs/vliteral.cpp
   exprs/varray_literal.cpp
   exprs/vin_predicate.cpp
+  exprs/vbloom_predicate.cpp
+  exprs/vruntimefilter_wrapper.cpp
   exprs/vtuple_is_null_predicate.cpp
   exprs/vslot_ref.cpp
   exprs/vcast_expr.cpp
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 16bed3f56d..ba1a30c22e 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -29,6 +29,7 @@
 #include "util/to_string.h"
 #include "vec/core/block.h"
 #include "vec/exec/volap_scanner.h"
+#include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
@@ -77,6 +78,7 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     /// TODO: could one filter used in the different scan_node ?
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.resize(filter_size);
+    _runtime_filter_ready_flag.resize(filter_size);
     for (int i = 0; i < filter_size; ++i) {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
@@ -86,6 +88,8 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
                                                                         &runtime_filter));
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+        _rf_locks.push_back(std::make_unique<std::mutex>());
     }
 
     return Status::OK();
@@ -389,29 +393,73 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         scanner->set_opened();
     }
 
-    std::vector<ExprContext*> contexts;
+    std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
     for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
         if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+            /// When runtime filters are ready during running, we should use them to filter data
+            /// in VOlapScanner.
+            /// New arrival rf will be processed as below:
+            /// 1. convert these runtime filters to vectorized expressions
+            /// 2. if this is the first scanner thread to receive this rf, construct a new
+            /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
+            /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
+            /// once after any runtime_filters are ready.
+            /// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
             IRuntimeFilter* runtime_filter = nullptr;
             state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
                                                             &runtime_filter);
             DCHECK(runtime_filter != nullptr);
             bool ready = runtime_filter->is_ready();
             if (ready) {
-                runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+                runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
                 scanner_filter_apply_marks[i] = true;
+                if (!_runtime_filter_ready_flag[i]) {
+                    std::unique_lock<std::mutex> l(*(_rf_locks[i]));
+                    if (!_runtime_filter_ready_flag[i]) {
+                        // Use all conjuncts and new arrival runtime filters to construct a new
+                        // expression tree here.
+                        auto last_expr =
+                                _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
+                        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+                            TExprNode texpr_node;
+                            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+                            texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+                            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+                            VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+                            new_node->add_child(last_expr);
+                            new_node->add_child(vexprs[j]);
+                            last_expr = new_node;
+                        }
+                        auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+                        auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(),
+                                                                          expr_mem_tracker());
+                        // If error occurs in `prepare` or `open` phase, discard these runtime
+                        // filters directly.
+                        if (UNLIKELY(!expr_status.OK())) {
+                            LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
+                            vexprs.clear();
+                            break;
+                        }
+                        expr_status = new_vconjunct_ctx_ptr->open(state);
+                        if (UNLIKELY(!expr_status.OK())) {
+                            LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
+                            vexprs.clear();
+                            break;
+                        }
+                        _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+                        *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+                        _runtime_filter_ready_flag[i] = true;
+                    }
+                }
             }
         }
     }
 
-    if (!contexts.empty()) {
-        std::vector<ExprContext*> new_contexts;
-        auto& scanner_conjunct_ctxs = *scanner->conjunct_ctxs();
-        Expr::clone_if_not_exists(contexts, state, &new_contexts);
-        scanner_conjunct_ctxs.insert(scanner_conjunct_ctxs.end(), new_contexts.begin(),
-                                     new_contexts.end());
+    if (!vexprs.empty()) {
+        WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
+                      "Something wrong for runtime filters: ");
         scanner->set_use_pushdown_conjuncts(true);
     }
 
diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h
index 196de61bb0..f6ce2c5f25 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -228,6 +228,8 @@ private:
     };
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+    std::vector<bool> _runtime_filter_ready_flag;
+    std::vector<std::unique_ptr<std::mutex>> _rf_locks;
     std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
 
     std::unique_ptr<RuntimeProfile> _scanner_profile;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp
new file mode 100644
index 0000000000..6f73b2c69d
--- /dev/null
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vbloom_predicate.h"
+
+#include <string_view>
+
+namespace doris::vectorized {
+
+VBloomPredicate::VBloomPredicate(const TExprNode& node)
+        : VExpr(node), _filter(nullptr), _expr_name("bloom_predicate") {}
+
+Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                VExprContext* context) {
+    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+
+    if (_prepared) {
+        return Status::OK();
+    }
+    if (_children.size() != 1) {
+        return Status::InternalError("Invalid argument for VBloomPredicate.");
+    }
+
+    _prepared = true;
+
+    ColumnsWithTypeAndName argument_template;
+    argument_template.reserve(_children.size());
+    for (auto child : _children) {
+        auto column = child->data_type()->create_column();
+        argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name());
+    }
+    return Status::OK();
+}
+
+Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
+                             FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    return Status::OK();
+}
+
+void VBloomPredicate::close(RuntimeState* state, VExprContext* context,
+                            FunctionContext::FunctionStateScope scope) {
+    VExpr::close(state, context, scope);
+}
+
+Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) {
+    doris::vectorized::ColumnNumbers arguments(_children.size());
+    for (int i = 0; i < _children.size(); ++i) {
+        int column_id = -1;
+        _children[i]->execute(context, block, &column_id);
+        arguments[i] = column_id;
+    }
+    // call function
+    size_t num_columns_without_result = block->columns();
+    auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+
+    ColumnPtr argument_column =
+            block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+    size_t sz = argument_column->size();
+    res_data_column->resize(sz);
+    auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+    for (size_t i = 0; i < sz; i++) {
+        ptr[i] = _filter->find(reinterpret_cast<const void*>(argument_column->get_data_at(i).data));
+    }
+    if (_data_type->is_nullable()) {
+        auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+        block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                       _data_type, _expr_name});
+    } else {
+        block->insert({std::move(res_data_column), _data_type, _expr_name});
+    }
+    *result_column_id = num_columns_without_result;
+    return Status::OK();
+}
+
+const std::string& VBloomPredicate::expr_name() const {
+    return _expr_name;
+}
+void VBloomPredicate::set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter) {
+    _filter.reset(filter.release());
+}
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h
new file mode 100644
index 0000000000..f715b9e40e
--- /dev/null
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bloomfilter_predicate.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VBloomPredicate final : public VExpr {
+public:
+    VBloomPredicate(const TExprNode& node);
+    ~VBloomPredicate() = default;
+    doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                          int* result_column_id) override;
+    doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                          VExprContext* context) override;
+    doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    void close(doris::RuntimeState* state, VExprContext* context,
+               FunctionContext::FunctionStateScope scope) override;
+    VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VBloomPredicate(*this));
+    }
+    const std::string& expr_name() const override;
+    void set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter);
+
+private:
+    std::shared_ptr<IBloomFilterFuncBase> _filter;
+    std::string _expr_name;
+
+    bool _prepared;
+};
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 29c8903950..058895a6fb 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -33,6 +33,7 @@
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vinfo_func.h"
 #include "vec/exprs/vliteral.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/exprs/vtuple_is_null_predicate.h"
 
@@ -57,6 +58,15 @@ VExpr::VExpr(const doris::TExprNode& node)
     _data_type = DataTypeFactory::instance().create_data_type(_type, is_nullable);
 }
 
+VExpr::VExpr(const VExpr& vexpr)
+        : _node_type(vexpr._node_type),
+          _type(vexpr._type),
+          _data_type(vexpr._data_type),
+          _children(vexpr._children),
+          _fn(vexpr._fn),
+          _fn_context_index(vexpr._fn_context_index),
+          _constant_col(vexpr._constant_col) {}
+
 VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
         : _type(type), _fn_context_index(-1) {
     if (is_slotref) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 7ebe597a0f..7d4fd5c9f0 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -43,6 +43,7 @@ public:
     }
 
     VExpr(const TExprNode& node);
+    VExpr(const VExpr& vexpr);
     VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable);
     // only used for test
     VExpr() = default;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
new file mode 100644
index 0000000000..2bd353e934
--- /dev/null
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
+        size_t num_columns_without_result = block->columns();
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));
+        uint8_t* data = nullptr;
+        const ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id);
+        if (auto* nullable = check_and_get_column<ColumnNullable>(*result_column.column)) {
+            data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())
+                           ->get_data()
+                           .data();
+            _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
+                                                          nullable->get_null_map_data().data(),
+                                                          block->rows());
+        } else if (auto* res_col =
+                           check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) {
+            data = const_cast<uint8_t*>(res_col->get_data().data());
+            _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
+                                                          block->rows());
+        } else {
+            return Status::InternalError("Invalid type for runtime filters!");
+        }
+
+        if ((!_has_calculate_filter) && (_scan_rows.load() >= THRESHOLD_TO_CALCULATE_RATE)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < EXPECTED_FILTER_RATE) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;
+        }
+        return Status::OK();
+    }
+}
+
+const std::string& VRuntimeFilterWrapper::expr_name() const {
+    return _expr_name;
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h
new file mode 100644
index 0000000000..755d243fd8
--- /dev/null
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+    VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+    VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+    ~VRuntimeFilterWrapper() = default;
+    doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                          int* result_column_id) override;
+    doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                          VExprContext* context) override;
+    doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    std::string debug_string() const override { return _impl->debug_string(); };
+    bool is_constant() const override;
+    void close(doris::RuntimeState* state, VExprContext* context,
+               FunctionContext::FunctionStateScope scope) override;
+    VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VRuntimeFilterWrapper(*this));
+    }
+    const std::string& expr_name() const override;
+
+    ColumnPtrWrapper* get_const_col(VExprContext* context) override {
+        return _impl->get_const_col(context);
+    }
+
+private:
+    VExpr* _impl;
+
+    bool _always_true;
+    /// TODO: statistic filter rate in the profile
+    std::atomic<int64_t> _filtered_rows;
+    std::atomic<int64_t> _scan_rows;
+
+    bool _has_calculate_filter = false;
+    // loop size must be power of 2
+    constexpr static int64_t THRESHOLD_TO_CALCULATE_RATE = 8192;
+    // if filter rate less than this, bloom filter will set always true
+    constexpr static double EXPECTED_FILTER_RATE = 0.2;
+
+    std::string _expr_name;
+};
+} // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org