You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/04 12:59:15 UTC

[incubator-doris] branch master updated: [Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 149def9  [Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)
149def9 is described below

commit 149def9e42e968ccc0f6e88c482d80ebb0b9a3b2
Author: stdpain <34...@users.noreply.github.com>
AuthorDate: Sun Jul 4 20:59:05 2021 +0800

    [Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)
    
    1. support in/bloomfilter/minmax
    2. support broadcast/shuffle/bucket shuffle/colocate join
    3. opt memory use and cpu cache miss while build runtime filter
    4. opt memory use in left semi join (works well on tpcds-95)
---
 .clang-format                                      |    1 +
 be/src/common/config.h                             |    4 +
 be/src/exec/hash_join_node.cpp                     |  144 +--
 be/src/exec/hash_join_node.h                       |   14 +-
 be/src/exec/hash_join_node_ir.cpp                  |   18 +-
 be/src/exec/hash_table.cpp                         |    3 +
 be/src/exec/hash_table.h                           |   30 +-
 be/src/exec/hash_table.hpp                         |   73 ++
 be/src/exec/olap_scan_node.cpp                     |  156 ++-
 be/src/exec/olap_scan_node.h                       |   34 +-
 be/src/exec/olap_scanner.cpp                       |   18 +-
 be/src/exec/olap_scanner.h                         |   19 +-
 be/src/exprs/CMakeLists.txt                        |    3 +
 be/src/exprs/bloomfilter_predicate.cpp             |  154 +++
 be/src/exprs/bloomfilter_predicate.h               |  287 +++++
 be/src/exprs/expr_context.h                        |    4 +-
 be/src/exprs/in_predicate.cpp                      |    8 +-
 be/src/exprs/in_predicate.h                        |    3 +-
 be/src/exprs/literal.h                             |    2 +
 be/src/exprs/runtime_filter.cpp                    | 1138 ++++++++++++++++++++
 be/src/exprs/runtime_filter.h                      |  324 ++++++
 be/src/exprs/runtime_filter_rpc.cpp                |   93 ++
 be/src/olap/CMakeLists.txt                         |    1 +
 be/src/olap/bloom_filter_predicate.cpp             |   67 ++
 be/src/olap/bloom_filter_predicate.h               |   59 +
 be/src/olap/in_list_predicate.cpp                  |  139 +--
 be/src/olap/in_list_predicate.h                    |   80 +-
 be/src/olap/reader.cpp                             |   92 +-
 be/src/olap/reader.h                               |   31 +-
 be/src/olap/rowset/segment_v2/bloom_filter.cpp     |    2 +-
 be/src/olap/rowset/segment_v2/bloom_filter.h       |   21 +-
 be/src/runtime/CMakeLists.txt                      |    1 +
 be/src/runtime/decimal_value.h                     |    1 +
 be/src/runtime/fragment_mgr.cpp                    |   44 +
 be/src/runtime/fragment_mgr.h                      |   10 +
 be/src/runtime/runtime_filter_mgr.cpp              |  304 ++++++
 be/src/runtime/runtime_filter_mgr.h                |  179 +++
 be/src/runtime/runtime_state.cpp                   |   12 +-
 be/src/runtime/runtime_state.h                     |   10 +
 be/src/service/internal_service.cpp                |   44 +-
 be/src/service/internal_service.h                  |    9 +
 be/test/exec/hash_table_test.cpp                   |    8 +-
 be/test/exprs/CMakeLists.txt                       |    3 +-
 be/test/exprs/bloom_filter_predicate_test.cpp      |  105 ++
 be/test/exprs/runtime_filter_test.cpp              |  173 +++
 be/test/olap/CMakeLists.txt                        |    1 +
 .../olap/bloom_filter_column_predicate_test.cpp    |  190 ++++
 be/test/olap/in_list_predicate_test.cpp            |   18 +-
 be/test/olap/rowset/segment_v2/segment_test.cpp    |    6 +-
 gensrc/proto/internal_service.proto                |   71 ++
 gensrc/proto/palo_internal_service.proto           |    2 +
 gensrc/thrift/Exprs.thrift                         |    3 +
 gensrc/thrift/Opcodes.thrift                       |    1 +
 gensrc/thrift/PaloInternalService.thrift           |   47 +
 gensrc/thrift/PlanNodes.thrift                     |   43 +
 gensrc/thrift/Types.thrift                         |   15 +
 56 files changed, 4039 insertions(+), 283 deletions(-)

diff --git a/.clang-format b/.clang-format
index 0f9b9ba..3b8b570 100644
--- a/.clang-format
+++ b/.clang-format
@@ -13,3 +13,4 @@ PointerAlignment: Left
 ReflowComments: false
 SortUsingDeclarations: false
 SpacesBeforeTrailingComments: 1
+SpaceBeforeCpp11BracedList: true
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 575659a..bec0b9e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -614,6 +614,10 @@ CONF_Int16(mem_tracker_level, "0");
 // This config usually only needs to be modified during testing.
 // In most cases, it does not need to be modified.
 CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
+
+// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
+// else we will call sync method
+CONF_mBool(runtime_filter_use_async_rpc, "true");
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index ce22f3e..fff59ef 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -14,19 +14,22 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
 #include "exec/hash_join_node.h"
 
+#include <memory>
 #include <sstream>
 
 #include "exec/hash_table.hpp"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "exprs/in_predicate.h"
+#include "exprs/runtime_filter.h"
 #include "exprs/slot_ref.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
+#include "util/defer_op.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -36,7 +39,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
           _join_op(tnode.hash_join_node.join_op),
           _probe_counter(0),
           _probe_eos(false),
-          _process_build_batch_fn(NULL),
           _process_probe_batch_fn(NULL),
           _anti_join_last_pos(NULL) {
     _match_all_probe =
@@ -44,8 +46,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
     _match_one_build = (_join_op == TJoinOp::LEFT_SEMI_JOIN);
     _match_all_build =
             (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
-    _is_push_down = tnode.hash_join_node.is_push_down;
     _build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN;
+
+    _runtime_filter_descs = tnode.runtime_filters;
 }
 
 HashJoinNode::~HashJoinNode() {
@@ -81,6 +84,11 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _build_unique = false;
     }
 
+    for (const auto& filter_desc : _runtime_filter_descs) {
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::PRODUCER,
+                                                                   filter_desc));
+    }
+
     return Status::OK();
 }
 
@@ -97,7 +105,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
     _probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
     _hash_tbl_load_factor_counter =
             ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
+    _hash_table_list_min_size = ADD_COUNTER(runtime_profile(), "HashTableMinList", TUnit::UNIT);
+    _hash_table_list_max_size = ADD_COUNTER(runtime_profile(), "HashTableMaxList", TUnit::UNIT);
     // build and probe exprs are evaluated in the context of the rows produced by our
     // right and left children, respectively
     RETURN_IF_ERROR(
@@ -186,21 +195,22 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
     RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker().get());
     RETURN_IF_ERROR(child(1)->open(state));
 
+    SCOPED_TIMER(_build_timer);
+    Defer defer {[&] {
+        COUNTER_SET(_build_rows_counter, _hash_tbl->size());
+        COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
+        COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
+        auto node = _hash_tbl->minmax_node();
+        COUNTER_SET(_hash_table_list_min_size, node.first);
+        COUNTER_SET(_hash_table_list_max_size, node.second);
+    }};
     while (true) {
         RETURN_IF_CANCELLED(state);
         bool eos = true;
         RETURN_IF_ERROR(child(1)->get_next(state, &build_batch, &eos));
-        SCOPED_TIMER(_build_timer);
-        // take ownership of tuple data of build_batch
-        _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
-        RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
-        process_build_batch(&build_batch);
-
+        RETURN_IF_ERROR(process_build_batch(state, &build_batch));
         VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc());
 
-        COUNTER_SET(_build_rows_counter, _hash_tbl->size());
-        COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
-        COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
         build_batch.reset();
 
         if (eos) {
@@ -236,91 +246,23 @@ Status HashJoinNode::open(RuntimeState* state) {
         thread_status.set_value(construct_hash_table(state));
     }
 
-    if (_children[0]->type() == TPlanNodeType::EXCHANGE_NODE &&
-        _children[1]->type() == TPlanNodeType::EXCHANGE_NODE) {
-        _is_push_down = false;
-    }
-
-    // The predicate could not be pushed down when there is Null-safe equal operator.
-    // The in predicate will filter the null value in child[0] while it is needed in the Null-safe equal join.
-    // For example: select * from a join b where a.id<=>b.id
-    // the null value in table a should be return by scan node instead of filtering it by In-predicate.
-    if (std::find(_is_null_safe_eq_join.begin(), _is_null_safe_eq_join.end(), true) !=
-        _is_null_safe_eq_join.end()) {
-        _is_push_down = false;
-    }
+    if (!_runtime_filter_descs.empty()) {
+        RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
+                                                _runtime_filter_descs);
 
-    if (_is_push_down) {
-        // Blocks until ConstructHashTable has returned, after which
-        // the hash table is fully constructed and we can start the probe
-        // phase.
         RETURN_IF_ERROR(thread_status.get_future().get());
-
-        if (_hash_tbl->size() == 0 && _join_op == TJoinOp::INNER_JOIN) {
-            // Hash table size is zero
-            LOG(INFO) << "No element need to push down, no need to read probe table";
-            RETURN_IF_ERROR(child(0)->open(state));
-            _probe_batch_pos = 0;
-            _hash_tbl_iterator = _hash_tbl->begin();
-            _eos = true;
-            return Status::OK();
-        }
-
-        if (_hash_tbl->size() > 1024) {
-            _is_push_down = false;
+        RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(),
+                                                  _hash_tbl->size()));
+        {
+            SCOPED_TIMER(_push_compute_timer);
+            auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
+            _hash_tbl->for_each_row(func);
         }
-
-        // TODO: this is used for Code Check, Remove this later
-        if (_is_push_down || 0 != child(1)->conjunct_ctxs().size()) {
-            for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
-                TExprNode node;
-                node.__set_node_type(TExprNodeType::IN_PRED);
-                TScalarType tscalar_type;
-                tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
-                TTypeNode ttype_node;
-                ttype_node.__set_type(TTypeNodeType::SCALAR);
-                ttype_node.__set_scalar_type(tscalar_type);
-                TTypeDesc t_type_desc;
-                t_type_desc.types.push_back(ttype_node);
-                node.__set_type(t_type_desc);
-                node.in_predicate.__set_is_not_in(false);
-                node.__set_opcode(TExprOpcode::FILTER_IN);
-                node.__isset.vector_opcode = true;
-                node.__set_vector_opcode(to_in_opcode(_probe_expr_ctxs[i]->root()->type().type));
-                // NOTE(zc): in predicate only used here, no need prepare.
-                InPredicate* in_pred = _pool->add(new InPredicate(node));
-                RETURN_IF_ERROR(in_pred->prepare(state, _probe_expr_ctxs[i]->root()->type()));
-                in_pred->add_child(Expr::copy(_pool, _probe_expr_ctxs[i]->root()));
-                ExprContext* ctx = _pool->add(new ExprContext(in_pred));
-                _push_down_expr_ctxs.push_back(ctx);
-            }
-
-            {
-                SCOPED_TIMER(_push_compute_timer);
-                HashTable::Iterator iter = _hash_tbl->begin();
-
-                while (iter.has_next()) {
-                    TupleRow* row = iter.get_row();
-                    std::list<ExprContext*>::iterator ctx_iter = _push_down_expr_ctxs.begin();
-
-                    for (int i = 0; i < _build_expr_ctxs.size(); ++i, ++ctx_iter) {
-                        void* val = _build_expr_ctxs[i]->get_value(row);
-                        InPredicate* in_pre = (InPredicate*)((*ctx_iter)->root());
-                        in_pre->insert(val);
-                    }
-
-                    SCOPED_TIMER(_build_timer);
-                    iter.next<false>();
-                }
-            }
-
+        COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
+        {
             SCOPED_TIMER(_push_down_timer);
-            push_down_predicate(state, &_push_down_expr_ctxs);
+            runtime_filter_slots.publish(this);
         }
-
-        // Open the probe-side child so that it may perform any initialisation in parallel.
-        // Don't exit even if we see an error, we still need to wait for the build thread
-        // to finish.
         Status open_status = child(0)->open(state);
         RETURN_IF_ERROR(open_status);
     } else {
@@ -370,8 +312,8 @@ Status HashJoinNode::open(RuntimeState* state) {
 Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
-    // In most cases, no additional memory overhead will be applied for at this stage, 
-    // but if the expression calculation in this node needs to apply for additional memory, 
+    // In most cases, no additional memory overhead will be applied for at this stage,
+    // but if the expression calculation in this node needs to apply for additional memory,
     // it may cause the memory to exceed the limit.
     RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -618,6 +560,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
     *eos = _eos;
 
     ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+    Defer defer {[&] { COUNTER_SET(_rows_returned_counter, _num_rows_returned); }};
 
     while (!_eos) {
         // Compute max rows that should be added to out_batch
@@ -628,16 +571,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
         }
 
         // Continue processing this row batch
-        if (_process_probe_batch_fn == NULL) {
-            _num_rows_returned +=
-                    process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
-            COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-        } else {
-            // Use codegen'd function
-            _num_rows_returned +=
-                    _process_probe_batch_fn(this, out_batch, _probe_batch.get(), max_added_rows);
-            COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-        }
+        _num_rows_returned += process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
 
         if (reached_limit() || out_batch->is_full()) {
             *eos = reached_limit();
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index bfe2b36..4e1ea92 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -62,9 +62,10 @@ protected:
     void debug_string(int indentation_level, std::stringstream* out) const;
 
 private:
+    friend class IRuntimeFilter;
+
     boost::scoped_ptr<HashTable> _hash_tbl;
     HashTable::Iterator _hash_tbl_iterator;
-    bool _is_push_down;
 
     // for right outer joins, keep track of what's been joined
     typedef std::unordered_set<TupleRow*> BuildTupleRowSet;
@@ -117,11 +118,6 @@ private:
     // This should be the same size as the probe tuple row.
     int _result_tuple_row_size;
 
-    // Function declaration for codegen'd function.  Signature must match
-    // HashJoinNode::ProcessBuildBatch
-    typedef void (*ProcessBuildBatchFn)(HashJoinNode*, RowBatch*);
-    ProcessBuildBatchFn _process_build_batch_fn;
-
     // HashJoinNode::process_probe_batch() exactly
     typedef int (*ProcessProbeBatchFn)(HashJoinNode*, RowBatch*, RowBatch*, int);
     // Jitted ProcessProbeBatch function pointer.  Null if codegen is disabled.
@@ -138,6 +134,8 @@ private:
     RuntimeProfile::Counter* _probe_rows_counter;    // num probe rows
     RuntimeProfile::Counter* _build_buckets_counter; // num buckets in hash table
     RuntimeProfile::Counter* _hash_tbl_load_factor_counter;
+    RuntimeProfile::Counter* _hash_table_list_min_size;
+    RuntimeProfile::Counter* _hash_table_list_max_size;
 
     // Supervises ConstructHashTable in a separate thread, and
     // returns its status in the promise parameter.
@@ -162,7 +160,7 @@ private:
     int process_probe_batch(RowBatch* out_batch, RowBatch* probe_batch, int max_added_rows);
 
     // Construct the build hash table, adding all the rows in 'build_batch'
-    void process_build_batch(RowBatch* build_batch);
+    Status process_build_batch(RuntimeState* state, RowBatch* build_batch);
 
     // Write combined row, consisting of probe_row and build_row, to out_row.
     // This is replaced by codegen.
@@ -176,6 +174,8 @@ private:
     // doing the join.
     std::string get_probe_row_output_string(TupleRow* probe_row);
 
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+
     // RELEASE_CONTEXT_COUNTER should be power of 2
     // GCC will optimize the modulo operation to &(release_context_counter - 1)
     // build_expr_context and probe_expr_context will free local alloc after this probe calculations
diff --git a/be/src/exec/hash_join_node_ir.cpp b/be/src/exec/hash_join_node_ir.cpp
index 3bd7f42..7f885e7 100644
--- a/be/src/exec/hash_join_node_ir.cpp
+++ b/be/src/exec/hash_join_node_ir.cpp
@@ -19,6 +19,8 @@
 #include "exec/hash_table.hpp"
 #include "exprs/expr_context.h"
 #include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple_row.h"
 
 namespace doris {
 
@@ -141,17 +143,29 @@ end:
 
 // when build table has too many duplicated rows, the collisions will be very serious,
 // so in some case will don't need to store duplicated value in hash table, we can build an unique one
-void HashJoinNode::process_build_batch(RowBatch* build_batch) {
+Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_batch) {
     // insert build row into our hash table
     if (_build_unique) {
         for (int i = 0; i < build_batch->num_rows(); ++i) {
-            _hash_tbl->insert_unique(build_batch->get_row(i));
+            // _hash_tbl->insert_unique(build_batch->get_row(i));
+            TupleRow* tuple_row = nullptr;
+            if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) {
+                build_batch->get_row(i)->deep_copy(tuple_row,
+                                                   child(1)->row_desc().tuple_descriptors(),
+                                                   _build_pool.get(), false);
+            }
         }
+        RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
     } else {
+        // take ownership of tuple data of build_batch
+        _build_pool->acquire_data(build_batch->tuple_data_pool(), false);
+        RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
+
         for (int i = 0; i < build_batch->num_rows(); ++i) {
             _hash_tbl->insert(build_batch->get_row(i));
         }
     }
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp
index 5a17935..8b2b1df 100644
--- a/be/src/exec/hash_table.cpp
+++ b/be/src/exec/hash_table.cpp
@@ -66,10 +66,12 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
     _expr_value_null_bits = new uint8_t[_build_expr_ctxs.size()];
 
     _alloc_list.reserve(10);
+    _end_list.reserve(10);
     _current_nodes = reinterpret_cast<uint8_t*>(malloc(_current_capacity * _node_byte_size));
     // TODO: remove memset later
     memset(_current_nodes, 0, _current_capacity * _node_byte_size);
     _alloc_list.push_back(_current_nodes);
+    _end_list.push_back(_current_nodes + _current_capacity * _node_byte_size);
 
     _mem_tracker->Consume(_current_capacity * _node_byte_size);
     if (_mem_tracker->limit_exceeded()) {
@@ -249,6 +251,7 @@ void HashTable::grow_node_array() {
     memset(_current_nodes, 0, alloc_size);
     // add _current_nodes to alloc pool
     _alloc_list.push_back(_current_nodes);
+    _end_list.push_back(_current_nodes + alloc_size);
 
     _mem_tracker->Consume(alloc_size);
     if (_mem_tracker->limit_exceeded()) {
diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h
index caaf01b..0560fc8 100644
--- a/be/src/exec/hash_table.h
+++ b/be/src/exec/hash_table.h
@@ -118,6 +118,8 @@ public:
         }
     }
 
+    bool IR_ALWAYS_INLINE emplace_key(TupleRow* row, TupleRow** key_addr);
+
     // Returns the start iterator for all rows that match 'probe_row'.  'probe_row' is
     // evaluated with _probe_expr_ctxs.  The iterator can be iterated until HashTable::end()
     // to find all the matching rows.
@@ -170,6 +172,8 @@ public:
     // just the build row addresses.
     std::string debug_string(bool skip_empty, const RowDescriptor* build_desc);
 
+    inline std::pair<int64_t, int64_t> minmax_node();
+
     // stl-like iterator interface.
     class Iterator {
     public:
@@ -233,6 +237,27 @@ public:
         uint32_t _scan_hash;
     };
 
+    template <class Func>
+    void for_each_row(Func&& func) {
+        size_t sz = _alloc_list.size();
+        DCHECK_GT(sz, 0);
+        for (size_t i = 0; i < sz - 1; ++i) {
+            uint8_t* start = _alloc_list[i];
+            uint8_t* end = _end_list[i];
+            while (start < end) {
+                auto node = reinterpret_cast<Node*>(start);
+                func(node->data());
+                start += _node_byte_size;
+            }
+        }
+        uint8_t* last_st = _alloc_list[sz - 1];
+        for (size_t i = 0; i < _current_used; ++i) {
+            auto node = reinterpret_cast<Node*>(last_st);
+            func(node->data());
+            last_st += _node_byte_size;
+        }
+    }
+
 private:
     friend class Iterator;
     friend class HashTableTest;
@@ -254,8 +279,9 @@ private:
     };
 
     struct Bucket {
-        Bucket() { _node = nullptr; }
+        Bucket() : _node(nullptr), _size(0) {}
         Node* _node;
+        uint64_t _size;
     };
 
     // Returns the next non-empty bucket and updates idx to be the index of that bucket.
@@ -389,6 +415,8 @@ private:
     uint8_t* _expr_value_null_bits;
     // node buffer list
     std::vector<uint8_t*> _alloc_list;
+    // node buffer end pointer
+    std::vector<uint8_t*> _end_list;
 };
 
 } // namespace doris
diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp
index ae2710d..2e655ff 100644
--- a/be/src/exec/hash_table.hpp
+++ b/be/src/exec/hash_table.hpp
@@ -22,6 +22,59 @@
 
 namespace doris {
 
+inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) {
+    bool has_nulls = eval_build_row(row);
+
+    if (!_stores_nulls && has_nulls) {
+        return false;
+    }
+
+    uint32_t hash = hash_current_row();
+    int64_t bucket_idx = hash & (_num_buckets - 1);
+
+    Bucket* bucket = &_buckets[bucket_idx];
+    Node* node = bucket->_node;
+
+    bool will_insert = true;
+
+    if (node == nullptr) {
+        will_insert = true;
+    } else {
+        Node* last_node = node;
+        while (node != nullptr) {
+            if (node->_hash == hash && equals(node->data())) {
+                will_insert = false;
+                break;
+            }
+            last_node = node;
+            node = node->_next;
+        }
+        node = last_node;
+    }
+    if (will_insert) {
+        if (_num_filled_buckets > _num_buckets_till_resize) {
+            resize_buckets(_num_buckets * 2);
+            // real bucket_id will modify after resize buckets
+            bucket_idx = hash & (_num_buckets - 1);
+        }
+        if (_current_used == _current_capacity) {
+            grow_node_array();
+        }
+        Node* alloc_node =
+                reinterpret_cast<Node*>(_current_nodes + _node_byte_size * _current_used++);
+        ++_num_nodes;
+        TupleRow* data = alloc_node->data();
+        *dest_addr = data;
+        alloc_node->_hash = hash;
+        if (node == nullptr) {
+            add_to_bucket(&_buckets[bucket_idx], alloc_node);
+        } else {
+            node->_next = alloc_node;
+        }
+    }
+    return will_insert;
+}
+
 inline HashTable::Iterator HashTable::find(TupleRow* probe_row, bool probe) {
     bool has_nulls = probe ? eval_probe_row(probe_row) : eval_build_row(probe_row);
 
@@ -100,11 +153,13 @@ inline void HashTable::add_to_bucket(Bucket* bucket, Node* node) {
 
     node->_next = bucket->_node;
     bucket->_node = node;
+    bucket->_size++;
 }
 
 inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node,
                                  Node* previous_node) {
     Node* next_node = node->_next;
+    from_bucket->_size--;
 
     if (previous_node != NULL) {
         previous_node->_next = next_node;
@@ -120,6 +175,24 @@ inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* n
     add_to_bucket(to_bucket, node);
 }
 
+inline std::pair<int64_t, int64_t> HashTable::minmax_node() {
+    bool has_value = false;
+    int64_t min_size = std::numeric_limits<int64_t>::max();
+    int64_t max_size = std::numeric_limits<int64_t>::min();
+    for (const auto bucket : _buckets) {
+        int64_t counter = bucket._size;
+        if (counter > 0) {
+            has_value = true;
+            min_size = std::min(counter, min_size);
+            max_size = std::max(counter, max_size);
+        }
+    }
+    if (!has_value) {
+        return std::make_pair(0, 0);
+    }
+    return std::make_pair(min_size, max_size);
+}
+
 template <bool check_match>
 inline void HashTable::Iterator::next() {
     if (_bucket_idx == -1) {
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index f9ecb2a..409e27f 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -30,9 +30,11 @@
 #include "exprs/binary_predicate.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
+#include "exprs/runtime_filter.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/exec_env.h"
 #include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
@@ -59,7 +61,8 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
           _status(Status::OK()),
           _resource_info(nullptr),
           _buffered_bytes(0),
-          _eval_conjuncts_fn(nullptr) {}
+          _eval_conjuncts_fn(nullptr),
+          _runtime_filter_descs(tnode.runtime_filters) {}
 
 OlapScanNode::~OlapScanNode() {}
 
@@ -80,6 +83,20 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
     }
 
+    /// TODO: could one filter used in the different scan_node ?
+    int filter_size = _runtime_filter_descs.size();
+    _runtime_filter_ctxs.resize(filter_size);
+    for (int i = 0; i < filter_size; ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        const auto& filter_desc = _runtime_filter_descs[i];
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::CONSUMER,
+                                                                   filter_desc, id()));
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+                                                                        &runtime_filter));
+
+        _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+    }
+
     return Status::OK();
 }
 
@@ -173,6 +190,13 @@ Status OlapScanNode::prepare(RuntimeState* state) {
     }
 
     _runtime_state = state;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
+                                                        &runtime_filter);
+        DCHECK(runtime_filter != nullptr);
+        runtime_filter->init_profile(_runtime_profile.get());
+    }
     return Status::OK();
 }
 
@@ -184,6 +208,38 @@ Status OlapScanNode::open(RuntimeState* state) {
 
     _resource_info = ResourceTls::get_resource_tls();
 
+    // acquire runtime filter
+    _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
+
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        auto& filter_desc = _runtime_filter_descs[i];
+        IRuntimeFilter* runtime_filter = nullptr;
+        state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
+        DCHECK(runtime_filter != nullptr);
+        if (runtime_filter == nullptr) {
+            continue;
+        }
+        bool ready = runtime_filter->is_ready();
+        if (!ready) {
+            ready = runtime_filter->await();
+        }
+        if (ready) {
+            std::list<ExprContext*> expr_context;
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
+            _runtime_filter_ctxs[i].apply_mark = true;
+            _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+
+            for (auto ctx : expr_context) {
+                ctx->prepare(state, row_desc(), _expr_mem_tracker);
+                ctx->open(state);
+                int index = _conjunct_ctxs.size();
+                _conjunct_ctxs.push_back(ctx);
+                // it's safe to store address from a fix-resized vector
+                _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
+            }
+        }
+    }
+
     return Status::OK();
 }
 
@@ -336,6 +392,13 @@ Status OlapScanNode::close(RuntimeState* state) {
         scanner->close(state);
     }
 
+    for (auto& filter_desc : _runtime_filter_descs) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
+        DCHECK(runtime_filter != nullptr);
+        runtime_filter->consumer_close();
+    }
+
     VLOG_CRITICAL << "OlapScanNode::close()";
     return ScanNode::close(state);
 }
@@ -372,7 +435,7 @@ Status OlapScanNode::start_scan(RuntimeState* state) {
     eval_const_conjuncts();
 
     VLOG_CRITICAL << "NormalizeConjuncts";
-    // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts will
+    // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts may
     // set eos = true
     RETURN_IF_ERROR(normalize_conjuncts());
 
@@ -429,7 +492,7 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
     }
     auto new_direct_conjunct_size = new_conjunct_ctxs.size();
 
-    // dispose hash push down conjunct second
+    // dispose hash join push down conjunct second
     for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
         if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
             _pushed_conjuncts_index.cend()) {
@@ -441,6 +504,13 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
 
     _conjunct_ctxs = std::move(new_conjunct_ctxs);
     _direct_conjunct_size = new_direct_conjunct_size;
+
+    for (auto push_down_ctx : _pushed_conjuncts_index) {
+        auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx);
+        if (iter != _conjunctid_to_runtime_filter_ctxs.end()) {
+            iter->second->runtimefilter->set_push_down_profile();
+        }
+    }
 }
 
 void OlapScanNode::eval_const_conjuncts() {
@@ -716,7 +786,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare failed.
             _scanner_pool->add(scanner);
-            RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter));
+            RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
+                                             _bloom_filters_push_down));
 
             _olap_scanners.push_back(scanner);
             disk_set.insert(scanner->scan_disk());
@@ -725,6 +796,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
     COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
     COUNTER_SET(_num_scanners, static_cast<int64_t>(_olap_scanners.size()));
 
+    // PAIN_LOG(_olap_scanners.size());
     // init progress
     std::stringstream ss;
     ss << "ScanThread complete (node=" << id() << "):";
@@ -746,6 +818,9 @@ Status OlapScanNode::normalize_predicate(ColumnValueRange<T>& range, SlotDescrip
     // 3. Normalize BinaryPredicate , add to ColumnValueRange
     RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));
 
+    // 3. Normalize BloomFilterPredicate, push down by hash join node
+    RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot));
+
     // 4. Check whether range is empty, set _eos
     if (range.is_empty_value_range()) _eos = true;
 
@@ -767,6 +842,11 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
 
 bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
                                                  doris::InPredicate* pred) {
+    if (pred->is_not_in()) {
+        // can not push down NOT IN predicate to storage engine
+        return false;
+    }
+
     if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
         // not a slot ref(column)
         return false;
@@ -1145,7 +1225,6 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
                 }
 
                 switch (slot->type().type) {
-
                 case TYPE_DATE: {
                     DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
                     // NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value
@@ -1201,6 +1280,47 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
     return Status::OK();
 }
 
+Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
+    std::vector<uint32_t> filter_conjuncts_index;
+
+    for (int conj_idx = _direct_conjunct_size; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
+        Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
+        if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue;
+
+        Expr* pred = _conjunct_ctxs[conj_idx]->root();
+        DCHECK(pred->get_num_children() == 1);
+
+        if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
+            continue;
+        }
+        if (pred->get_child(0)->type().type != slot->type().type) {
+            if (!ignore_cast(slot, pred->get_child(0))) {
+                continue;
+            }
+        }
+
+        std::vector<SlotId> slot_ids;
+
+        if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) {
+            if (slot_ids[0] != slot->id()) {
+                continue;
+            }
+            // only key column of bloom filter will push down to storage engine
+            if (is_key_column(slot->col_name())) {
+                filter_conjuncts_index.emplace_back(conj_idx);
+                _bloom_filters_push_down.emplace_back(
+                        slot->col_name(),
+                        (reinterpret_cast<BloomFilterPredicate*>(pred))->get_bloom_filter_func());
+            }
+        }
+    }
+
+    std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
+              std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
+
+    return Status::OK();
+}
+
 void OlapScanNode::transfer_thread(RuntimeState* state) {
     // scanner open pushdown to scanThread
     state->resource_pool()->acquire_thread_token();
@@ -1386,6 +1506,32 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
         scanner->set_opened();
     }
 
+    std::vector<ExprContext*> contexts;
+    auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
+    DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
+    for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
+        if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+            IRuntimeFilter* runtime_filter = nullptr;
+            state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
+                                                            &runtime_filter);
+            DCHECK(runtime_filter != nullptr);
+            bool ready = runtime_filter->is_ready();
+            if (ready) {
+                runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+                _runtime_filter_ctxs[i].apply_mark = true;
+            }
+        }
+    }
+
+    if (!contexts.empty()) {
+        std::vector<ExprContext*> new_contexts;
+        auto& scanner_conjunct_ctxs = *scanner->conjunct_ctxs();
+        Expr::clone_if_not_exists(contexts, state, &new_contexts);
+        scanner_conjunct_ctxs.insert(scanner_conjunct_ctxs.end(), new_contexts.begin(),
+                                     new_contexts.end());
+        scanner->set_use_pushdown_conjuncts(true);
+    }
+
     // apply to cgroup
     if (_resource_info != nullptr) {
         CgroupsMgr::apply_cgroup(_resource_info->user, _resource_info->group);
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index f0c6695..2acb84f 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -27,6 +27,7 @@
 #include "exec/olap_common.h"
 #include "exec/olap_scanner.h"
 #include "exec/scan_node.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "exprs/in_predicate.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch_interface.hpp"
@@ -35,6 +36,7 @@
 #include "util/spinlock.h"
 
 namespace doris {
+class IRuntimeFilter;
 
 enum TransferStatus {
     READ_ROWBATCH = 1,
@@ -138,7 +140,7 @@ protected:
     // In order to ensure the accuracy of the query result
     // only key column conjuncts will be remove as idle conjunct
     bool is_key_column(const std::string& key_name);
-    void remove_pushed_conjuncts(RuntimeState *state);
+    void remove_pushed_conjuncts(RuntimeState* state);
 
     Status start_scan(RuntimeState* state);
 
@@ -160,9 +162,12 @@ protected:
     template <class T>
     Status normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange<T>* range);
 
+    Status normalize_bloom_filter_predicate(SlotDescriptor* slot);
+
     template <typename T>
     static bool normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
-            const std::string& is_null_str, ColumnValueRange<T>* range);
+                                            const std::string& is_null_str,
+                                            ColumnValueRange<T>* range);
 
     void transfer_thread(RuntimeState* state);
     void scanner_thread(OlapScanner* scanner);
@@ -172,6 +177,8 @@ protected:
     // Write debug string of this into out.
     virtual void debug_string(int indentation_level, std::stringstream* out) const;
 
+    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return _runtime_filter_descs; }
+
 private:
     void _init_counter(RuntimeState* state);
     // OLAP_SCAN_NODE profile layering: OLAP_SCAN_NODE, OlapScanner, and SegmentIterator
@@ -180,11 +187,12 @@ private:
 
     bool should_push_down_in_predicate(SlotDescriptor* slot, InPredicate* in_pred);
 
-    std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred, int conj_idx, int child_idx);
-
     template <typename T, typename ChangeFixedValueRangeFunc>
-    static Status change_fixed_value_range(ColumnValueRange <T> &range, PrimitiveType type, void *value,
-                                               const ChangeFixedValueRangeFunc& func);
+    static Status change_fixed_value_range(ColumnValueRange<T>& range, PrimitiveType type,
+                                           void* value, const ChangeFixedValueRangeFunc& func);
+
+    std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred,
+                                                         int conj_idx, int child_idx);
 
     friend class OlapScanner;
 
@@ -212,6 +220,11 @@ private:
     std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
 
     std::vector<TCondition> _olap_filter;
+    // push down bloom filters to storage engine.
+    // 1. std::pair.first :: column name
+    // 2. std::pair.second :: shared_ptr of BloomFilterFuncBase
+    std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
+            _bloom_filters_push_down;
 
     // Pool for storing allocated scanner objects.  We don't want to use the
     // runtime pool to ensure that the scanner objects are deleted before this
@@ -283,6 +296,15 @@ private:
     // or be overwritten by value in TQueryOptions
     int32_t _max_pushdown_conditions_per_column = 1024;
 
+    struct RuntimeFilterContext {
+        RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {}
+        bool apply_mark;
+        IRuntimeFilter* runtimefilter;
+    };
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+    std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
+
     std::unique_ptr<RuntimeProfile> _scanner_profile;
     std::unique_ptr<RuntimeProfile> _segment_profile;
 
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e97d3ce..9b2b9fb 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -60,9 +60,10 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
 
 OlapScanner::~OlapScanner() {}
 
-Status OlapScanner::prepare(const TPaloScanRange& scan_range,
-                            const std::vector<OlapScanRange*>& key_ranges,
-                            const std::vector<TCondition>& filters) {
+Status OlapScanner::prepare(
+        const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
+        const std::vector<TCondition>& filters,
+        const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
     // Get olap table
     TTabletId tablet_id = scan_range.tablet_id;
     SchemaHash schema_hash = strtoul(scan_range.schema_hash.c_str(), nullptr, 10);
@@ -107,7 +108,7 @@ Status OlapScanner::prepare(const TPaloScanRange& scan_range,
 
     {
         // Initialize _params
-        RETURN_IF_ERROR(_init_params(key_ranges, filters));
+        RETURN_IF_ERROR(_init_params(key_ranges, filters, bloom_filters));
     }
 
     return Status::OK();
@@ -120,6 +121,8 @@ Status OlapScanner::open() {
         _use_pushdown_conjuncts = true;
     }
 
+    _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
+
     auto res = _reader->init(_params);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to init reader.[res=%d]", res);
@@ -132,8 +135,9 @@ Status OlapScanner::open() {
 }
 
 // it will be called under tablet read lock because capture rs readers need
-Status OlapScanner::_init_params(const std::vector<OlapScanRange*>& key_ranges,
-                                 const std::vector<TCondition>& filters) {
+Status OlapScanner::_init_params(
+        const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
+        const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
     RETURN_IF_ERROR(_init_return_columns());
 
     _params.tablet = _tablet;
@@ -145,6 +149,8 @@ Status OlapScanner::_init_params(const std::vector<OlapScanRange*>& key_ranges,
     for (auto& filter : filters) {
         _params.conditions.push_back(filter);
     }
+    std::copy(bloom_filters.cbegin(), bloom_filters.cend(),
+              std::inserter(_params.bloom_filters, _params.bloom_filters.begin()));
 
     // Range
     for (auto key_range : key_ranges) {
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 15cf13b..2c09578 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -27,6 +27,7 @@
 #include "common/status.h"
 #include "exec/exec_node.h"
 #include "exec/olap_utils.h"
+#include "exprs/bloomfilter_predicate.h"
 #include "exprs/expr.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
@@ -55,7 +56,9 @@ public:
     ~OlapScanner();
 
     Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
-                   const std::vector<TCondition>& filters);
+                   const std::vector<TCondition>& filters,
+                   const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>&
+                           bloom_filters);
 
     Status open();
 
@@ -83,14 +86,19 @@ public:
         _watcher.start();
     }
 
-    int64_t update_wait_worker_timer() {
-        return _watcher.elapsed_time();
+    int64_t update_wait_worker_timer() { return _watcher.elapsed_time(); }
+
+    void set_use_pushdown_conjuncts(bool has_pushdown_conjuncts) {
+        _use_pushdown_conjuncts = has_pushdown_conjuncts;
     }
 
+    std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; }
 
 private:
     Status _init_params(const std::vector<OlapScanRange*>& key_ranges,
-                        const std::vector<TCondition>& filters);
+                        const std::vector<TCondition>& filters,
+                        const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
+                                bloom_filters);
     Status _init_return_columns();
     void _convert_row_to_tuple(Tuple* tuple);
 
@@ -98,7 +106,6 @@ private:
     void _update_realtime_counter();
 
 private:
-
     RuntimeState* _runtime_state;
     OlapScanNode* _parent;
     const TupleDescriptor* _tuple_desc; /**< tuple descriptor */
@@ -106,6 +113,8 @@ private:
     const std::vector<SlotDescriptor*>& _string_slots;
 
     std::vector<ExprContext*> _conjunct_ctxs;
+    // to record which runtime filters have been used
+    std::vector<bool> _runtime_filter_marks;
 
     int _id;
     bool _is_open;
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index bc179e5..f64b5dd 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -45,6 +45,9 @@ add_library(Exprs
   expr_context.cpp
   in_predicate.cpp
   new_in_predicate.cpp
+  bloomfilter_predicate.cpp
+  runtime_filter.cpp
+  runtime_filter_rpc.cpp
   is_null_predicate.cpp
   like_predicate.cpp
   math_functions.cpp
diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp
new file mode 100644
index 0000000..34c5127
--- /dev/null
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/bloomfilter_predicate.h"
+
+#include <sstream>
+
+#include "exprs/anyval_util.h"
+#include "runtime/raw_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.hpp"
+
+namespace doris {
+
+BloomFilterFuncBase* BloomFilterFuncBase::create_bloom_filter(MemTracker* tracker,
+                                                              PrimitiveType type) {
+    switch (type) {
+    case TYPE_BOOLEAN:
+        return new (std::nothrow) BloomFilterFunc<bool>(tracker);
+
+    case TYPE_TINYINT:
+        return new (std::nothrow) BloomFilterFunc<int8_t>(tracker);
+
+    case TYPE_SMALLINT:
+        return new (std::nothrow) BloomFilterFunc<int16_t>(tracker);
+
+    case TYPE_INT:
+        return new (std::nothrow) BloomFilterFunc<int32_t>(tracker);
+
+    case TYPE_BIGINT:
+        return new (std::nothrow) BloomFilterFunc<int64_t>(tracker);
+
+    case TYPE_FLOAT:
+        return new (std::nothrow) BloomFilterFunc<float>(tracker);
+
+    case TYPE_DOUBLE:
+        return new (std::nothrow) BloomFilterFunc<double>(tracker);
+
+    case TYPE_DATE:
+        return new (std::nothrow) DateBloomFilterFunc(tracker);
+
+    case TYPE_DATETIME:
+        return new (std::nothrow) DateTimeBloomFilterFunc(tracker);
+
+    case TYPE_DECIMAL:
+        return new (std::nothrow) DecimalFilterFunc(tracker);
+
+    case TYPE_DECIMALV2:
+        return new (std::nothrow) DecimalV2FilterFunc(tracker);
+
+    case TYPE_LARGEINT:
+        return new (std::nothrow) BloomFilterFunc<__int128>(tracker);
+
+    case TYPE_CHAR:
+        return new (std::nothrow) FixedCharBloomFilterFunc(tracker);
+    case TYPE_VARCHAR:
+        return new (std::nothrow) BloomFilterFunc<StringValue>(tracker);
+
+    default:
+        return nullptr;
+    }
+
+    return nullptr;
+}
+
+Status BloomFilterFuncBase::get_data(char** data, int* len) {
+    *data = _bloom_filter->data();
+    *len = _bloom_filter->size();
+    return Status::OK();
+}
+
+BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
+        : Predicate(node),
+          _is_prepare(false),
+          _always_true(false),
+          _filtered_rows(0),
+          _scan_rows(0) {}
+
+BloomFilterPredicate::~BloomFilterPredicate() {
+    LOG(INFO) << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows
+              << ",rate:" << (double)_filtered_rows / _scan_rows;
+}
+
+BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other)
+        : Predicate(other),
+          _is_prepare(other._is_prepare),
+          _always_true(other._always_true),
+          _filtered_rows(),
+          _scan_rows() {}
+
+Status BloomFilterPredicate::prepare(RuntimeState* state, BloomFilterFuncBase* filter) {
+    // DCHECK(filter != nullptr);
+    if (_is_prepare) {
+        return Status::OK();
+    }
+    _filter.reset(filter);
+    if (NULL == _filter.get()) {
+        return Status::InternalError("Unknown column type.");
+    }
+    _is_prepare = true;
+    return Status::OK();
+}
+
+std::string BloomFilterPredicate::debug_string() const {
+    std::stringstream out;
+    out << "BloomFilterPredicate()";
+    return out.str();
+}
+
+BooleanVal BloomFilterPredicate::get_boolean_val(ExprContext* ctx, TupleRow* row) {
+    if (_always_true) {
+        return BooleanVal(true);
+    }
+    const void* lhs_slot = ctx->get_value(_children[0], row);
+    if (lhs_slot == NULL) {
+        return BooleanVal::null();
+    }
+    _scan_rows++;
+    if (_filter->find(lhs_slot)) {
+        return BooleanVal(true);
+    }
+    _filtered_rows++;
+
+    if (!_has_calculate_filter && _scan_rows % _loop_size == 0) {
+        double rate = (double)_filtered_rows / _scan_rows;
+        if (rate < _expect_filter_rate) {
+            _always_true = true;
+        }
+        _has_calculate_filter = true;
+    }
+    return BooleanVal(false);
+}
+
+Status BloomFilterPredicate::open(RuntimeState* state, ExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    Expr::open(state, context, scope);
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
new file mode 100644
index 0000000..c727055
--- /dev/null
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
+#define DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
+#include <algorithm>
+#include <memory>
+#include <string>
+
+#include "common/object_pool.h"
+#include "exprs/expr_context.h"
+#include "exprs/predicate.h"
+#include "olap/rowset/segment_v2/bloom_filter.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/raw_value.h"
+
+namespace doris {
+/// only used in Runtime Filter
+class BloomFilterFuncBase {
+public:
+    BloomFilterFuncBase(MemTracker* tracker) : _tracker(tracker), _inited(false) {};
+
+    virtual ~BloomFilterFuncBase() {
+        if (_tracker != nullptr) {
+            _tracker->Release(_bloom_filter_alloced);
+        }
+    }
+
+    // init a bloom filter with expect element num
+    virtual Status init(int64_t expect_num = 4096, double fpp = 0.05) {
+        DCHECK(!_inited);
+        DCHECK(expect_num >= 0); // we need alloc 'optimal_bit_num(expect_num,fpp) / 8' bytes
+        _bloom_filter_alloced =
+                doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8;
+
+        std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+        Status st = doris::segment_v2::BloomFilter::create(
+                doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
+        // status is always true if we use valid BloomFilterAlgorithmPB
+        DCHECK(st.ok());
+        RETURN_IF_ERROR(st);
+        st = bloom_filter->init(_bloom_filter_alloced,
+                                doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+        // status is always true if we use HASH_MURMUR3_X64_64
+        DCHECK(st.ok());
+        _bloom_filter.reset(bloom_filter.release());
+        _tracker->Consume(_bloom_filter_alloced);
+        _inited = true;
+        return st;
+    }
+
+    virtual Status init_with_fixed_length(int64_t bloom_filter_length) {
+        DCHECK(!_inited);
+        DCHECK(bloom_filter_length >= 0);
+
+        std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+        _bloom_filter_alloced = bloom_filter_length;
+        Status st = doris::segment_v2::BloomFilter::create(
+                doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
+        DCHECK(st.ok());
+        st = bloom_filter->init(_bloom_filter_alloced,
+                                doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+        DCHECK(st.ok());
+        _tracker->Consume(_bloom_filter_alloced);
+        _bloom_filter.reset(bloom_filter.release());
+        _inited = true;
+        return st;
+    }
+
+    virtual void insert(const void* data) = 0;
+
+    virtual bool find(const void* data) = 0;
+
+    // Because the data structures of the execution layer and the storage layer are inconsistent,
+    // we need to provide additional interfaces for the storage layer to call
+    virtual bool find_olap_engine(const void* data) { return this->find(data); }
+
+    Status merge(BloomFilterFuncBase* bloomfilter_func) {
+        DCHECK(_inited);
+        if (_bloom_filter == nullptr) {
+            std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+            RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
+                    doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
+            _bloom_filter.reset(bloom_filter.release());
+        }
+        if (_bloom_filter_alloced != bloomfilter_func->_bloom_filter_alloced) {
+            LOG(WARNING) << "bloom filter size not the same";
+            return Status::InvalidArgument("bloom filter size invalid");
+        }
+        return _bloom_filter->merge(bloomfilter_func->_bloom_filter.get());
+    }
+
+    Status assign(const char* data, int len) {
+        if (_bloom_filter == nullptr) {
+            std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+            RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
+                    doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
+            _bloom_filter.reset(bloom_filter.release());
+        }
+        _bloom_filter_alloced = len - 1;
+        _tracker->Consume(_bloom_filter_alloced);
+        return _bloom_filter->init(data, len,
+                                   doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+    }
+    /// create a bloom filter function
+    /// tracker shouldn't be nullptr
+    static BloomFilterFuncBase* create_bloom_filter(MemTracker* tracker, PrimitiveType type);
+
+    Status get_data(char** data, int* len);
+
+    MemTracker* tracker() { return _tracker; }
+
+    void light_copy(BloomFilterFuncBase* other) {
+        _tracker = nullptr;
+        _bloom_filter_alloced = other->_bloom_filter_alloced;
+        _bloom_filter = other->_bloom_filter;
+        _inited = other->_inited;
+    }
+
+protected:
+    MemTracker* _tracker;
+    // bloom filter size
+    int32_t _bloom_filter_alloced;
+    std::shared_ptr<doris::segment_v2::BloomFilter> _bloom_filter;
+    bool _inited;
+};
+
+template <class T>
+class BloomFilterFunc : public BloomFilterFuncBase {
+public:
+    BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
+
+    ~BloomFilterFunc() = default;
+
+    virtual void insert(const void* data) {
+        DCHECK(_bloom_filter != nullptr);
+        _bloom_filter->add_bytes((char*)data, sizeof(T));
+    }
+
+    virtual bool find(const void* data) {
+        DCHECK(_bloom_filter != nullptr);
+        return _bloom_filter->test_bytes((char*)data, sizeof(T));
+    }
+};
+
+template <>
+class BloomFilterFunc<StringValue> : public BloomFilterFuncBase {
+public:
+    BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
+
+    ~BloomFilterFunc() = default;
+
+    virtual void insert(const void* data) {
+        DCHECK(_bloom_filter != nullptr);
+        const auto* value = reinterpret_cast<const StringValue*>(data);
+        _bloom_filter->add_bytes(value->ptr, value->len);
+    }
+
+    virtual bool find(const void* data) {
+        DCHECK(_bloom_filter != nullptr);
+        const auto* value = reinterpret_cast<const StringValue*>(data);
+        return _bloom_filter->test_bytes(value->ptr, value->len);
+    }
+};
+
+class FixedCharBloomFilterFunc : public BloomFilterFunc<StringValue> {
+public:
+    FixedCharBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<StringValue>(tracker) {}
+
+    ~FixedCharBloomFilterFunc() = default;
+
+    virtual bool find(const void* data) {
+        DCHECK(_bloom_filter != nullptr);
+        const auto* value = reinterpret_cast<const StringValue*>(data);
+        auto end_ptr = value->ptr + value->len - 1;
+        while (end_ptr > value->ptr && *end_ptr == '\0') --end_ptr;
+        return _bloom_filter->test_bytes(value->ptr, end_ptr - value->ptr + 1);
+    }
+};
+
+class DateTimeBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
+public:
+    DateTimeBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
+
+    virtual bool find_olap_engine(const void* data) {
+        DateTimeValue value;
+        value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
+        return _bloom_filter->test_bytes((char*)&value, sizeof(DateTimeValue));
+    }
+};
+
+class DateBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
+public:
+    DateBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
+
+    virtual bool find_olap_engine(const void* data) {
+        uint64_t value = 0;
+        value = *(unsigned char*)((char*)data + 2);
+        value <<= 8;
+        value |= *(unsigned char*)((char*)data + 1);
+        value <<= 8;
+        value |= *(unsigned char*)((char*)data);
+        DateTimeValue date_value;
+        date_value.from_olap_date(value);
+        date_value.to_datetime();
+        return _bloom_filter->test_bytes((char*)&date_value, sizeof(DateTimeValue));
+    }
+};
+
+class DecimalFilterFunc : public BloomFilterFunc<DecimalValue> {
+public:
+    DecimalFilterFunc(MemTracker* tracker) : BloomFilterFunc<DecimalValue>(tracker) {}
+
+    virtual bool find_olap_engine(const void* data) {
+        int64_t int_value = *(int64_t*)(data);
+        int32_t frac_value = *(int32_t*)((char*)data + sizeof(int64_t));
+        DecimalValue value(int_value, frac_value);
+        return _bloom_filter->test_bytes((char*)&value, sizeof(DecimalValue));
+    }
+};
+
+class DecimalV2FilterFunc : public BloomFilterFunc<DecimalV2Value> {
+public:
+    DecimalV2FilterFunc(MemTracker* tracker) : BloomFilterFunc<DecimalV2Value>(tracker) {}
+
+    virtual bool find_olap_engine(const void* data) {
+        DecimalV2Value value;
+        int64_t int_value = *(int64_t*)(data);
+        int32_t frac_value = *(int32_t*)((char*)data + sizeof(int64_t));
+        value.from_olap_decimal(int_value, frac_value);
+        return _bloom_filter->test_bytes((char*)&value, sizeof(DecimalV2Value));
+    }
+};
+
+// BloomFilterPredicate only used in runtime filter
+class BloomFilterPredicate : public Predicate {
+public:
+    virtual ~BloomFilterPredicate();
+    BloomFilterPredicate(const TExprNode& node);
+    BloomFilterPredicate(const BloomFilterPredicate& other);
+    virtual Expr* clone(ObjectPool* pool) const override {
+        return pool->add(new BloomFilterPredicate(*this));
+    }
+    Status prepare(RuntimeState* state, BloomFilterFuncBase* bloomfilterfunc);
+
+    std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
+
+    virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override;
+
+    virtual Status open(RuntimeState* state, ExprContext* context,
+                        FunctionContext::FunctionStateScope scope) override;
+
+protected:
+    friend class Expr;
+    virtual std::string debug_string() const override;
+
+private:
+    bool _is_prepare;
+    // if we set always = true, we will skip bloom filter
+    bool _always_true;
+    /// TODO: statistic filter rate in the profile
+    std::atomic<int64_t> _filtered_rows;
+    std::atomic<int64_t> _scan_rows;
+
+    std::shared_ptr<BloomFilterFuncBase> _filter;
+    bool _has_calculate_filter = false;
+    // loop size must be power of 2
+    constexpr static int64_t _loop_size = 8192;
+    // if filter rate less than this, bloom filter will set always true
+    constexpr static double _expect_filter_rate = 0.2;
+};
+} // namespace doris
+#endif
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 5f01195..280a3c6 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -21,8 +21,8 @@
 #include <memory>
 
 #include "common/status.h"
-#include "exprs/expr_value.h"
 #include "exprs/expr.h"
+#include "exprs/expr_value.h"
 #include "exprs/slot_ref.h"
 #include "udf/udf.h"
 #include "udf/udf_internal.h" // for ArrayVal
@@ -172,6 +172,8 @@ private:
     friend class Expr;
     friend class ScalarFnCall;
     friend class InPredicate;
+    friend class RuntimePredicateWrapper;
+    friend class BloomFilterPredicate;
     friend class OlapScanNode;
     friend class EsScanNode;
     friend class EsPredicate;
diff --git a/be/src/exprs/in_predicate.cpp b/be/src/exprs/in_predicate.cpp
index 1edadcc..9db94ce 100644
--- a/be/src/exprs/in_predicate.cpp
+++ b/be/src/exprs/in_predicate.cpp
@@ -19,8 +19,8 @@
 
 #include <sstream>
 
-#include "exprs/expr_context.h"
 #include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.hpp"
@@ -36,12 +36,12 @@ InPredicate::InPredicate(const TExprNode& node)
 
 InPredicate::~InPredicate() {}
 
-Status InPredicate::prepare(RuntimeState* state, const TypeDescriptor& type) {
+Status InPredicate::prepare(RuntimeState* state, HybridSetBase* hset) {
     if (_is_prepare) {
         return Status::OK();
     }
-    _hybrid_set.reset(HybridSetBase::create_set(type.type));
-    if (NULL == _hybrid_set.get()) {
+    _hybrid_set.reset(hset);
+    if (NULL == _hybrid_set) {
         return Status::InternalError("Unknown column type.");
     }
     _is_prepare = true;
diff --git a/be/src/exprs/in_predicate.h b/be/src/exprs/in_predicate.h
index d544911..b90c1b9 100644
--- a/be/src/exprs/in_predicate.h
+++ b/be/src/exprs/in_predicate.h
@@ -38,7 +38,7 @@ public:
         return pool->add(new InPredicate(*this));
     }
 
-    Status prepare(RuntimeState* state, const TypeDescriptor&);
+    Status prepare(RuntimeState* state, HybridSetBase* hset);
     Status open(RuntimeState* state, ExprContext* context,
                 FunctionContext::FunctionStateScope scope);
     virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
@@ -57,6 +57,7 @@ public:
 protected:
     friend class Expr;
     friend class HashJoinNode;
+    friend class RuntimePredicateWrapper;
 
     InPredicate(const TExprNode& node);
 
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index e0e69f5..1b35336 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
 #define DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
 
+#include "binary_predicate.h"
 #include "common/object_pool.h"
 #include "exprs/expr.h"
 
@@ -46,6 +47,7 @@ public:
 
 protected:
     friend class Expr;
+    friend Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data);
     Literal(const TExprNode& node);
 
 private:
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
new file mode 100644
index 0000000..b83f8fc
--- /dev/null
+++ b/be/src/exprs/runtime_filter.cpp
@@ -0,0 +1,1138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter.h"
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "exec/hash_join_node.h"
+#include "exprs/binary_predicate.h"
+#include "exprs/bloomfilter_predicate.h"
+#include "exprs/expr.h"
+#include "exprs/expr_context.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/in_predicate.h"
+#include "exprs/literal.h"
+#include "exprs/predicate.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/types.pb.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+#include "runtime/type_limit.h"
+#include "util/defer_op.h"
+#include "util/runtime_profile.h"
+#include "util/string_parser.hpp"
+
+namespace doris {
+// only used in Runtime Filter
+class MinMaxFuncBase {
+public:
+    virtual void insert(void* data) = 0;
+    virtual bool find(void* data) = 0;
+    virtual bool is_empty() = 0;
+    virtual void* get_max() = 0;
+    virtual void* get_min() = 0;
+    // assign minmax data
+    virtual Status assign(void* min_data, void* max_data) = 0;
+    // merge from other minmax_func
+    virtual Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) = 0;
+    // create min-max filter function
+    static MinMaxFuncBase* create_minmax_filter(PrimitiveType type);
+};
+
+template <class T>
+class MinMaxNumFunc : public MinMaxFuncBase {
+public:
+    MinMaxNumFunc() = default;
+    ~MinMaxNumFunc() = default;
+    virtual void insert(void* data) {
+        if (data == nullptr) return;
+        T val_data = *reinterpret_cast<T*>(data);
+        if (_empty) {
+            _min = val_data;
+            _max = val_data;
+            _empty = false;
+            return;
+        }
+        if (val_data < _min) {
+            _min = val_data;
+        } else if (val_data > _max) {
+            _max = val_data;
+        }
+    }
+
+    virtual bool find(void* data) {
+        if (data == nullptr) {
+            return false;
+        }
+        T val_data = *reinterpret_cast<T*>(data);
+        return val_data >= _min && val_data <= _max;
+    }
+
+    Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) {
+        if constexpr (std::is_same_v<T, StringValue>) {
+            MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+
+            if (other_minmax->_min < _min) {
+                auto& other_min = other_minmax->_min;
+                auto str = pool->add(new std::string(other_min.ptr, other_min.len));
+                _min.ptr = str->data();
+                _min.len = str->length();
+            }
+            if (other_minmax->_max > _max) {
+                auto& other_max = other_minmax->_max;
+                auto str = pool->add(new std::string(other_max.ptr, other_max.len));
+                _max.ptr = str->data();
+                _max.len = str->length();
+            }
+
+        } else {
+            MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+            if (other_minmax->_min < _min) {
+                _min = other_minmax->_min;
+            }
+            if (other_minmax->_max > _max) {
+                _max = other_minmax->_max;
+            }
+        }
+
+        return Status::OK();
+    }
+
+    virtual bool is_empty() { return _empty; }
+
+    virtual void* get_max() { return &_max; }
+
+    virtual void* get_min() { return &_min; }
+
+    virtual Status assign(void* min_data, void* max_data) {
+        _min = *(T*)min_data;
+        _max = *(T*)max_data;
+        return Status::OK();
+    }
+
+private:
+    T _max = type_limit<T>::min();
+    T _min = type_limit<T>::max();
+    // we use _empty to avoid compare twice
+    bool _empty = true;
+};
+
+MinMaxFuncBase* MinMaxFuncBase::create_minmax_filter(PrimitiveType type) {
+    switch (type) {
+    case TYPE_BOOLEAN:
+        return new (std::nothrow) MinMaxNumFunc<bool>();
+
+    case TYPE_TINYINT:
+        return new (std::nothrow) MinMaxNumFunc<int8_t>();
+
+    case TYPE_SMALLINT:
+        return new (std::nothrow) MinMaxNumFunc<int16_t>();
+
+    case TYPE_INT:
+        return new (std::nothrow) MinMaxNumFunc<int32_t>();
+
+    case TYPE_BIGINT:
+        return new (std::nothrow) MinMaxNumFunc<int64_t>();
+
+    case TYPE_FLOAT:
+        return new (std::nothrow) MinMaxNumFunc<float>();
+
+    case TYPE_DOUBLE:
+        return new (std::nothrow) MinMaxNumFunc<double>();
+
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+        return new (std::nothrow) MinMaxNumFunc<DateTimeValue>();
+
+    case TYPE_DECIMAL:
+        return new (std::nothrow) MinMaxNumFunc<DecimalValue>();
+
+    case TYPE_DECIMALV2:
+        return new (std::nothrow) MinMaxNumFunc<DecimalV2Value>();
+
+    case TYPE_LARGEINT:
+        return new (std::nothrow) MinMaxNumFunc<__int128>();
+
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+        return new (std::nothrow) MinMaxNumFunc<StringValue>();
+    default:
+        DCHECK(false) << "Invalid type.";
+    }
+    return NULL;
+}
+
+// PrimitiveType->TExprNodeType
+// TODO: use constexpr if we use c++14
+TExprNodeType::type get_expr_node_type(PrimitiveType type) {
+    switch (type) {
+    case TYPE_BOOLEAN:
+        return TExprNodeType::BOOL_LITERAL;
+
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+        return TExprNodeType::INT_LITERAL;
+
+    case TYPE_LARGEINT:
+        return TExprNodeType::LARGE_INT_LITERAL;
+        break;
+
+    case TYPE_NULL:
+        return TExprNodeType::NULL_LITERAL;
+
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+    case TYPE_TIME:
+        return TExprNodeType::FLOAT_LITERAL;
+        break;
+
+    case TYPE_DECIMAL:
+    case TYPE_DECIMALV2:
+        return TExprNodeType::DECIMAL_LITERAL;
+
+    case TYPE_DATETIME:
+        return TExprNodeType::DATE_LITERAL;
+
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_HLL:
+    case TYPE_OBJECT:
+        return TExprNodeType::STRING_LITERAL;
+
+    default:
+        DCHECK(false) << "Invalid type.";
+        return TExprNodeType::NULL_LITERAL;
+    }
+}
+
+// PrimitiveType-> PColumnType
+// TODO: use constexpr if we use c++14
+PColumnType to_proto(PrimitiveType type) {
+    switch (type) {
+    case TYPE_BOOLEAN:
+        return PColumnType::COLUMN_TYPE_BOOL;
+    case TYPE_TINYINT:
+        return PColumnType::COLUMN_TYPE_TINY_INT;
+    case TYPE_SMALLINT:
+        return PColumnType::COLUMN_TYPE_SMALL_INT;
+    case TYPE_INT:
+        return PColumnType::COLUMN_TYPE_INT;
+    case TYPE_BIGINT:
+        return PColumnType::COLUMN_TYPE_BIGINT;
+    case TYPE_LARGEINT:
+        return PColumnType::COLUMN_TYPE_LARGEINT;
+    case TYPE_FLOAT:
+        return PColumnType::COLUMN_TYPE_FLOAT;
+    case TYPE_DOUBLE:
+        return PColumnType::COLUMN_TYPE_DOUBLE;
+    case TYPE_DATE:
+        return PColumnType::COLUMN_TYPE_DATE;
+    case TYPE_DATETIME:
+        return PColumnType::COLUMN_TYPE_DATETIME;
+    case TYPE_DECIMAL:
+        return PColumnType::COLUMN_TYPE_DECIMAL;
+    case TYPE_DECIMALV2:
+        return PColumnType::COLUMN_TYPE_DECIMALV2;
+    case TYPE_CHAR:
+        return PColumnType::COLUMN_TYPE_CHAR;
+    case TYPE_VARCHAR:
+        return PColumnType::COLUMN_TYPE_VARCHAR;
+    default:
+        DCHECK(false) << "Invalid type.";
+    }
+    DCHECK(false);
+    return PColumnType::COLUMN_TYPE_INT;
+}
+
+// PColumnType->PrimitiveType
+// TODO: use constexpr if we use c++14
+PrimitiveType to_primitive_type(PColumnType type) {
+    switch (type) {
+    case PColumnType::COLUMN_TYPE_BOOL:
+        return TYPE_BOOLEAN;
+    case PColumnType::COLUMN_TYPE_TINY_INT:
+        return TYPE_TINYINT;
+    case PColumnType::COLUMN_TYPE_SMALL_INT:
+        return TYPE_SMALLINT;
+    case PColumnType::COLUMN_TYPE_INT:
+        return TYPE_INT;
+    case PColumnType::COLUMN_TYPE_BIGINT:
+        return TYPE_BIGINT;
+    case PColumnType::COLUMN_TYPE_LARGEINT:
+        return TYPE_LARGEINT;
+    case PColumnType::COLUMN_TYPE_FLOAT:
+        return TYPE_FLOAT;
+    case PColumnType::COLUMN_TYPE_DOUBLE:
+        return TYPE_DOUBLE;
+    case PColumnType::COLUMN_TYPE_DATE:
+        return TYPE_DATE;
+    case PColumnType::COLUMN_TYPE_DATETIME:
+        return TYPE_DATETIME;
+    case PColumnType::COLUMN_TYPE_DECIMAL:
+        return TYPE_DECIMAL;
+    case PColumnType::COLUMN_TYPE_DECIMALV2:
+        return TYPE_DECIMALV2;
+    case PColumnType::COLUMN_TYPE_VARCHAR:
+        return TYPE_VARCHAR;
+    case PColumnType::COLUMN_TYPE_CHAR:
+        return TYPE_CHAR;
+    default:
+        DCHECK(false);
+    }
+    return TYPE_INT;
+}
+
+// PFilterType -> RuntimeFilterType
+RuntimeFilterType get_type(int filter_type) {
+    switch (filter_type) {
+    case PFilterType::BLOOM_FILTER: {
+        return RuntimeFilterType::BLOOM_FILTER;
+    }
+    case PFilterType::MINMAX_FILTER:
+        return RuntimeFilterType::MINMAX_FILTER;
+    default:
+        return RuntimeFilterType::UNKNOWN_FILTER;
+    }
+}
+
+// RuntimeFilterType -> PFilterType
+PFilterType get_type(RuntimeFilterType type) {
+    switch (type) {
+    case RuntimeFilterType::BLOOM_FILTER:
+        return PFilterType::BLOOM_FILTER;
+    case RuntimeFilterType::MINMAX_FILTER:
+        return PFilterType::MINMAX_FILTER;
+    default:
+        return PFilterType::UNKNOW_FILTER;
+    }
+}
+
+TTypeDesc create_type_desc(PrimitiveType type) {
+    TTypeDesc type_desc;
+    std::vector<TTypeNode> node_type;
+    node_type.emplace_back();
+    TScalarType scalarType;
+    scalarType.__set_type(to_thrift(type));
+    scalarType.__set_len(-1);
+    node_type.back().__set_scalar_type(scalarType);
+    type_desc.__set_types(node_type);
+    return type_desc;
+}
+
+// only used to push down to olap engine
+Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
+    TExprNode node;
+
+    switch (type) {
+    case TYPE_BOOLEAN: {
+        TBoolLiteral boolLiteral;
+        boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
+        node.__set_bool_literal(boolLiteral);
+        break;
+    }
+    case TYPE_TINYINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_SMALLINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_INT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_BIGINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        TLargeIntLiteral largeIntLiteral;
+        largeIntLiteral.__set_value(
+                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
+        node.__set_large_int_literal(largeIntLiteral);
+        break;
+    }
+    case TYPE_FLOAT: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DOUBLE: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        TDateLiteral dateLiteral;
+        char convert_buffer[30];
+        reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
+        dateLiteral.__set_value(convert_buffer);
+        node.__set_date_literal(dateLiteral);
+        break;
+    }
+    case TYPE_DECIMAL: {
+        TDecimalLiteral decimalLiteral;
+        decimalLiteral.__set_value(reinterpret_cast<const DecimalValue*>(data)->to_string());
+        node.__set_decimal_literal(decimalLiteral);
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        TDecimalLiteral decimalLiteral;
+        decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
+        node.__set_decimal_literal(decimalLiteral);
+        break;
+    }
+    case TYPE_CHAR:
+    case TYPE_VARCHAR: {
+        const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
+        TStringLiteral tstringLiteral;
+        tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
+        node.__set_string_literal(tstringLiteral);
+        break;
+    }
+    default:
+        DCHECK(false);
+        return NULL;
+    }
+    node.__set_node_type(get_expr_node_type(type));
+    node.__set_type(create_type_desc(type));
+    return pool->add(new Literal(node));
+}
+
+BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
+                                      TExprOpcode::type opcode) {
+    TExprNode node;
+    TScalarType tscalar_type;
+    tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
+    TTypeNode ttype_node;
+    ttype_node.__set_type(TTypeNodeType::SCALAR);
+    ttype_node.__set_scalar_type(tscalar_type);
+    TTypeDesc t_type_desc;
+    t_type_desc.types.push_back(ttype_node);
+    node.__set_type(t_type_desc);
+    node.__set_opcode(opcode);
+    node.__set_child_type(to_thrift(prim_type));
+    node.__set_num_children(2);
+    node.__set_output_scale(-1);
+    node.__set_node_type(TExprNodeType::BINARY_PRED);
+    return (BinaryPredicate*)pool->add(BinaryPredicate::from_thrift(node));
+}
+// This class is a wrapper of runtime predicate function
+class RuntimePredicateWrapper {
+public:
+    RuntimePredicateWrapper(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+                            const RuntimeFilterParams* params)
+            : _tracker(tracker),
+              _pool(pool),
+              _column_return_type(params->column_return_type),
+              _filter_type(params->filter_type) {}
+    // for a 'tmp' runtime predicate wrapper
+    // only could called assign method or as a param for merge
+    RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type)
+            : _tracker(tracker), _pool(pool), _filter_type(type) {}
+    // init runtimefilter wrapper
+    // alloc memory to init runtime filter function
+    Status init(const RuntimeFilterParams* params) {
+        switch (_filter_type) {
+        case RuntimeFilterType::IN_FILTER: {
+            _hybrid_set.reset(HybridSetBase::create_set(_column_return_type));
+            break;
+        }
+        case RuntimeFilterType::MINMAX_FILTER: {
+            _minmax_func.reset(MinMaxFuncBase::create_minmax_filter(_column_return_type));
+            break;
+        }
+        case RuntimeFilterType::BLOOM_FILTER: {
+            _bloomfilter_func.reset(
+                    BloomFilterFuncBase::create_bloom_filter(_tracker, _column_return_type));
+            return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+        }
+        default:
+            DCHECK(false);
+            return Status::InvalidArgument("Unknown Filter type");
+        }
+        return Status::OK();
+    }
+
+    void insert(void* data) {
+        switch (_filter_type) {
+        case RuntimeFilterType::IN_FILTER: {
+            if (data != nullptr) {
+                _hybrid_set->insert(data);
+            }
+            break;
+        }
+        case RuntimeFilterType::MINMAX_FILTER: {
+            _minmax_func->insert(data);
+            break;
+        }
+        case RuntimeFilterType::BLOOM_FILTER: {
+            DCHECK(_bloomfilter_func != nullptr);
+            _bloomfilter_func->insert(data);
+            break;
+        }
+        default:
+            DCHECK(false);
+            break;
+        }
+    }
+
+    template <class T>
+    Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr) {
+        DCHECK(state != nullptr);
+        DCHECK(container != nullptr);
+        DCHECK(_pool != nullptr);
+        DCHECK(prob_expr->root()->type().type == _column_return_type);
+
+        switch (_filter_type) {
+        case RuntimeFilterType::IN_FILTER: {
+            TTypeDesc type_desc = create_type_desc(_column_return_type);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::IN_PRED);
+            node.in_predicate.__set_is_not_in(false);
+            node.__set_opcode(TExprOpcode::FILTER_IN);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(to_in_opcode(_column_return_type));
+            auto in_pred = _pool->add(new InPredicate(node));
+            RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
+            in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+            ExprContext* ctx = _pool->add(new ExprContext(in_pred));
+            container->push_back(ctx);
+            break;
+        }
+        case RuntimeFilterType::MINMAX_FILTER: {
+            // create max filter
+            auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
+            auto max_literal = create_literal(_pool, _column_return_type, _minmax_func->get_max());
+            max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+            max_pred->add_child(max_literal);
+            container->push_back(_pool->add(new ExprContext(max_pred)));
+            // create min filter
+            auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
+            auto min_literal = create_literal(_pool, _column_return_type, _minmax_func->get_min());
+            min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+            min_pred->add_child(min_literal);
+            container->push_back(_pool->add(new ExprContext(min_pred)));
+            break;
+        }
+        case RuntimeFilterType::BLOOM_FILTER: {
+            // create a bloom filter
+            TTypeDesc type_desc = create_type_desc(_column_return_type);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::BLOOM_PRED);
+            node.__set_opcode(TExprOpcode::RT_FILTER);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(to_in_opcode(_column_return_type));
+            auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
+            RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
+            bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+            ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
+            container->push_back(ctx);
+            break;
+        }
+        default:
+            DCHECK(false);
+            break;
+        }
+        return Status::OK();
+    }
+
+    Status merge(const RuntimePredicateWrapper* wrapper) {
+        DCHECK(_filter_type == wrapper->_filter_type);
+        if (_filter_type != wrapper->_filter_type) {
+            return Status::InvalidArgument("invalid filter type");
+        }
+        switch (_filter_type) {
+        case RuntimeFilterType::IN_FILTER: {
+            DCHECK(false) << "in filter should't apply in shuffle join";
+            return Status::InternalError("in filter should't apply in shuffle join");
+        }
+        case RuntimeFilterType::MINMAX_FILTER: {
+            _minmax_func->merge(wrapper->_minmax_func.get(), _pool);
+            break;
+        }
+        case RuntimeFilterType::BLOOM_FILTER: {
+            _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+            break;
+        }
+        default:
+            DCHECK(false);
+            return Status::InternalError("unknown runtime filter");
+        }
+        return Status::OK();
+    }
+
+    // used by shuffle runtime filter
+    // assign this filter by protobuf
+    Status assign(const PBloomFilter* bloom_filter, const char* data) {
+        DCHECK(_tracker != nullptr);
+        // we won't use this class to insert or find any data
+        // so any type is ok
+        _bloomfilter_func.reset(
+                BloomFilterFuncBase::create_bloom_filter(_tracker, PrimitiveType::TYPE_INT));
+        return _bloomfilter_func->assign(data, bloom_filter->filter_length());
+    }
+
+    // used by shuffle runtime filter
+    // assign this filter by protobuf
+    Status assign(const PMinMaxFilter* minmax_filter) {
+        DCHECK(_tracker != nullptr);
+        PrimitiveType type = to_primitive_type(minmax_filter->column_type());
+        _minmax_func.reset(MinMaxFuncBase::create_minmax_filter(type));
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            bool min_val;
+            bool max_val;
+            min_val = minmax_filter->min_val().boolval();
+            max_val = minmax_filter->max_val().boolval();
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_TINYINT: {
+            int8_t min_val;
+            int8_t max_val;
+            min_val = static_cast<int8_t>(minmax_filter->min_val().intval());
+            max_val = static_cast<int8_t>(minmax_filter->max_val().intval());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_SMALLINT: {
+            int16_t min_val;
+            int16_t max_val;
+            min_val = static_cast<int16_t>(minmax_filter->min_val().intval());
+            max_val = static_cast<int16_t>(minmax_filter->max_val().intval());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_INT: {
+            int32_t min_val;
+            int32_t max_val;
+            min_val = minmax_filter->min_val().intval();
+            max_val = minmax_filter->max_val().intval();
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_BIGINT: {
+            int64_t min_val;
+            int64_t max_val;
+            min_val = minmax_filter->min_val().longval();
+            max_val = minmax_filter->max_val().longval();
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_LARGEINT: {
+            int128_t min_val;
+            int128_t max_val;
+            auto min_string_val = minmax_filter->min_val().stringval();
+            auto max_string_val = minmax_filter->max_val().stringval();
+            StringParser::ParseResult result;
+            min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+                                                            min_string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+                                                            max_string_val.length(), &result);
+            DCHECK(result == StringParser::PARSE_SUCCESS);
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_FLOAT: {
+            float min_val;
+            float max_val;
+            min_val = static_cast<float>(minmax_filter->min_val().doubleval());
+            max_val = static_cast<float>(minmax_filter->max_val().doubleval());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_DOUBLE: {
+            double min_val;
+            double max_val;
+            min_val = static_cast<double>(minmax_filter->min_val().doubleval());
+            max_val = static_cast<double>(minmax_filter->max_val().doubleval());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_DATETIME:
+        case TYPE_DATE: {
+            auto& min_val_ref = minmax_filter->min_val().stringval();
+            auto& max_val_ref = minmax_filter->max_val().stringval();
+            DateTimeValue min_val;
+            DateTimeValue max_val;
+            min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
+            max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        case TYPE_VARCHAR:
+        case TYPE_CHAR: {
+            auto& min_val_ref = minmax_filter->min_val().stringval();
+            auto& max_val_ref = minmax_filter->max_val().stringval();
+            auto min_val_ptr = _pool->add(new std::string(min_val_ref));
+            auto max_val_ptr = _pool->add(new std::string(max_val_ref));
+            StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
+            StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
+            return _minmax_func->assign(&min_val, &max_val);
+        }
+        default:
+            DCHECK(false) << "unknown type";
+            break;
+        }
+        return Status::InvalidArgument("not support!");
+    }
+
+    Status get_bloom_filter_desc(char** data, int* filter_length) {
+        return _bloomfilter_func->get_data(data, filter_length);
+    }
+
+    Status get_minmax_filter_desc(void** min_data, void** max_data) {
+        *min_data = _minmax_func->get_min();
+        *max_data = _minmax_func->get_max();
+        return Status::OK();
+    }
+
+    PrimitiveType column_type() { return _column_return_type; }
+
+    void ready_for_publish() {
+        if (_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+            switch (_column_return_type) {
+            case TYPE_VARCHAR:
+            case TYPE_CHAR: {
+                StringValue* min_value = static_cast<StringValue*>(_minmax_func->get_min());
+                StringValue* max_value = static_cast<StringValue*>(_minmax_func->get_max());
+                auto min_val_ptr = _pool->add(new std::string(min_value->ptr));
+                auto max_val_ptr = _pool->add(new std::string(max_value->ptr));
+                StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
+                StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
+                _minmax_func->assign(&min_val, &max_val);
+            }
+            default:
+                break;
+            }
+        }
+    }
+
+private:
+    MemTracker* _tracker;
+    ObjectPool* _pool;
+    PrimitiveType _column_return_type; // column type
+    RuntimeFilterType _filter_type;
+    std::unique_ptr<MinMaxFuncBase> _minmax_func;
+    std::unique_ptr<HybridSetBase> _hybrid_set;
+    std::unique_ptr<BloomFilterFuncBase> _bloomfilter_func;
+};
+
+Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+                              const TRuntimeFilterDesc* desc, const RuntimeFilterRole role,
+                              int node_id, IRuntimeFilter** res) {
+    *res = pool->add(new IRuntimeFilter(state, tracker, pool));
+    (*res)->set_role(role);
+    return (*res)->init_with_desc(desc, node_id);
+}
+
+void IRuntimeFilter::insert(void* data) {
+    DCHECK(is_producer());
+    _wrapper->insert(data);
+}
+
+Status IRuntimeFilter::publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx) {
+    DCHECK(is_producer());
+    if (_has_local_target) {
+        IRuntimeFilter* consumer_filter = nullptr;
+        // TODO: log if err
+        Status status =
+                _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter);
+        DCHECK(status.ok());
+        // push down
+        std::swap(this->_wrapper, consumer_filter->_wrapper);
+        consumer_filter->signal();
+        return Status::OK();
+    } else {
+        TNetworkAddress addr;
+        RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
+        return push_to_remote(_state, &addr);
+    }
+}
+
+void IRuntimeFilter::publish_finally() {
+    DCHECK(is_producer());
+    join_rpc();
+}
+
+Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs) {
+    DCHECK(is_consumer());
+    if (!_is_ignored) {
+        return _wrapper->get_push_context(push_expr_ctxs, _state, _probe_ctx);
+    }
+    return Status::OK();
+}
+
+Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs,
+                                          ExprContext* probe_ctx) {
+    DCHECK(is_producer());
+    return _wrapper->get_push_context(push_expr_ctxs, _state, probe_ctx);
+}
+
+Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
+                                            const RowDescriptor& desc,
+                                            const std::shared_ptr<MemTracker>& tracker) {
+    DCHECK(_is_ready);
+    DCHECK(is_consumer());
+    std::lock_guard<std::mutex> guard(_inner_mutex);
+
+    if (!_push_down_ctxs.empty()) {
+        push_expr_ctxs->insert(push_expr_ctxs->end(), _push_down_ctxs.begin(),
+                               _push_down_ctxs.end());
+        return Status::OK();
+    }
+    // push expr
+    RETURN_IF_ERROR(_wrapper->get_push_context(&_push_down_ctxs, _state, _probe_ctx));
+    RETURN_IF_ERROR(Expr::prepare(_push_down_ctxs, _state, desc, tracker));
+    return Expr::open(_push_down_ctxs, _state);
+}
+
+bool IRuntimeFilter::await() {
+    DCHECK(is_consumer());
+    SCOPED_TIMER(_await_time_cost);
+    int64_t wait_times_ms = _state->runtime_filter_wait_time_ms();
+    if (!_is_ready) {
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        return _inner_cv.wait_for(lock, std::chrono::milliseconds(wait_times_ms),
+                                  [this] { return this->_is_ready; });
+    }
+    return true;
+}
+
+void IRuntimeFilter::signal() {
+    DCHECK(is_consumer());
+    _is_ready = true;
+    _inner_cv.notify_all();
+    _effect_timer.reset();
+}
+
+Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, int node_id) {
+    // if node_id == -1 , it shouldn't be a consumer
+    DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
+
+    if (desc->type == TRuntimeFilterType::BLOOM) {
+        _runtime_filter_type = RuntimeFilterType::BLOOM_FILTER;
+    } else if (desc->type == TRuntimeFilterType::MIN_MAX) {
+        _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+    } else if (desc->type == TRuntimeFilterType::IN) {
+        _runtime_filter_type = RuntimeFilterType::IN_FILTER;
+    } else {
+        return Status::InvalidArgument("unknown filter type");
+    }
+
+    _is_broadcast_join = desc->is_broadcast_join;
+    _has_local_target = desc->has_local_targets;
+    _has_remote_target = desc->has_remote_targets;
+    _expr_order = desc->expr_order;
+    _filter_id = desc->filter_id;
+
+    ExprContext* build_ctx = nullptr;
+    RETURN_IF_ERROR(Expr::create_expr_tree(_pool, desc->src_expr, &build_ctx));
+
+    RuntimeFilterParams params;
+    params.filter_type = _runtime_filter_type;
+    params.column_return_type = build_ctx->root()->type().type;
+    if (desc->__isset.bloom_filter_size_bytes) {
+        params.bloom_filter_size = desc->bloom_filter_size_bytes;
+    }
+
+    if (node_id >= 0) {
+        DCHECK(is_consumer());
+        const auto iter = desc->planId_to_target_expr.find(node_id);
+        if (iter == desc->planId_to_target_expr.end()) {
+            DCHECK(false) << "runtime filter not found node_id:" << node_id;
+            return Status::InternalError("not found a node id");
+        }
+        RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second, &_probe_ctx));
+    }
+
+    _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _mem_tracker, _pool, &params));
+    return _wrapper->init(&params);
+}
+
+Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
+    return _serialize(request, data, len);
+}
+
+Status IRuntimeFilter::serialize(PPublishFilterRequest* request, void** data, int* len) {
+    return _serialize(request, data, len);
+}
+
+Status IRuntimeFilter::create_wrapper(const MergeRuntimeFilterParams* param, MemTracker* tracker,
+                                      ObjectPool* pool,
+                                      std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+    return _create_wrapper(param, tracker, pool, wrapper);
+}
+
+Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParams* param, MemTracker* tracker,
+                                      ObjectPool* pool,
+                                      std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+    return _create_wrapper(param, tracker, pool, wrapper);
+}
+
+template <class T>
+Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool,
+                                       std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+    int filter_type = param->request->filter_type();
+    wrapper->reset(new RuntimePredicateWrapper(tracker, pool, get_type(filter_type)));
+
+    switch (filter_type) {
+    case PFilterType::BLOOM_FILTER: {
+        DCHECK(param->request->has_bloom_filter());
+        return (*wrapper)->assign(&param->request->bloom_filter(), param->data);
+    }
+    case PFilterType::MINMAX_FILTER: {
+        DCHECK(param->request->has_minmax_filter());
+        return (*wrapper)->assign(&param->request->minmax_filter());
+    }
+    default:
+        return Status::InvalidArgument("unknow filter type");
+    }
+}
+
+void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
+    DCHECK(parent_profile != nullptr);
+    _profile.reset(new RuntimeProfile("RuntimeFilter:" + ::doris::to_string(_runtime_filter_type)));
+    parent_profile->add_child(_profile.get(), true, nullptr);
+
+    _effect_time_cost = ADD_TIMER(_profile, "EffectTimeCost");
+    _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+    _effect_timer.reset(new ScopedTimer<MonotonicStopWatch>(_effect_time_cost));
+    _effect_timer->start();
+}
+
+void IRuntimeFilter::set_push_down_profile() {
+    _profile->add_info_string("HasPushDownToEngine", "true");
+}
+
+void IRuntimeFilter::ready_for_publish() {
+    _wrapper->ready_for_publish();
+}
+
+Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
+    return _wrapper->merge(wrapper);
+}
+
+template <class T>
+Status IRuntimeFilter::_serialize(T* request, void** data, int* len) {
+    request->set_filter_type(get_type(_runtime_filter_type));
+
+    if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
+        RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len));
+        DCHECK(data != nullptr);
+        request->mutable_bloom_filter()->set_filter_length(*len);
+        request->mutable_bloom_filter()->set_always_true(false);
+    } else if (_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+        auto minmax_filter = request->mutable_minmax_filter();
+        to_protobuf(minmax_filter);
+    } else {
+        return Status::InvalidArgument("not implemented !");
+    }
+    return Status::OK();
+}
+
+void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
+    void* min_data = nullptr;
+    void* max_data = nullptr;
+    _wrapper->get_minmax_filter_desc(&min_data, &max_data);
+    DCHECK(min_data != nullptr);
+    DCHECK(max_data != nullptr);
+    filter->set_column_type(to_proto(_wrapper->column_type()));
+
+    switch (_wrapper->column_type()) {
+    case TYPE_BOOLEAN: {
+        filter->mutable_min_val()->set_boolval(*reinterpret_cast<const int32_t*>(min_data));
+        filter->mutable_max_val()->set_boolval(*reinterpret_cast<const int32_t*>(max_data));
+        return;
+    }
+    case TYPE_TINYINT: {
+        filter->mutable_min_val()->set_intval(*reinterpret_cast<const int8_t*>(min_data));
+        filter->mutable_max_val()->set_intval(*reinterpret_cast<const int8_t*>(max_data));
+        return;
+    }
+    case TYPE_SMALLINT: {
+        filter->mutable_min_val()->set_intval(*reinterpret_cast<const int16_t*>(min_data));
+        filter->mutable_max_val()->set_intval(*reinterpret_cast<const int16_t*>(max_data));
+        return;
+    }
+    case TYPE_INT: {
+        filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
+        filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
+        return;
+    }
+    case TYPE_BIGINT: {
+        filter->mutable_min_val()->set_longval(*reinterpret_cast<const int64_t*>(min_data));
+        filter->mutable_max_val()->set_longval(*reinterpret_cast<const int64_t*>(max_data));
+        return;
+    }
+    case TYPE_LARGEINT: {
+        filter->mutable_min_val()->set_stringval(
+                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(min_data)));
+        filter->mutable_max_val()->set_stringval(
+                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(max_data)));
+        return;
+    }
+    case TYPE_FLOAT: {
+        filter->mutable_min_val()->set_doubleval(*reinterpret_cast<const float*>(min_data));
+        filter->mutable_max_val()->set_doubleval(*reinterpret_cast<const float*>(max_data));
+        return;
+    }
+    case TYPE_DOUBLE: {
+        filter->mutable_min_val()->set_doubleval(*reinterpret_cast<const double*>(min_data));
+        filter->mutable_max_val()->set_doubleval(*reinterpret_cast<const double*>(max_data));
+        return;
+    }
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        char convert_buffer[30];
+        reinterpret_cast<const DateTimeValue*>(min_data)->to_string(convert_buffer);
+        filter->mutable_min_val()->set_stringval(convert_buffer);
+        reinterpret_cast<const DateTimeValue*>(max_data)->to_string(convert_buffer);
+        filter->mutable_max_val()->set_stringval(convert_buffer);
+        return;
+    }
+    case TYPE_DECIMAL: {
+        filter->mutable_min_val()->set_stringval(
+                reinterpret_cast<const DecimalValue*>(min_data)->to_string());
+        filter->mutable_max_val()->set_stringval(
+                reinterpret_cast<const DecimalValue*>(max_data)->to_string());
+        return;
+    }
+    case TYPE_DECIMALV2: {
+        filter->mutable_min_val()->set_stringval(
+                reinterpret_cast<const DecimalV2Value*>(min_data)->to_string());
+        filter->mutable_max_val()->set_stringval(
+                reinterpret_cast<const DecimalV2Value*>(max_data)->to_string());
+        return;
+    }
+    case TYPE_CHAR:
+    case TYPE_VARCHAR: {
+        const StringValue* min_string_value = reinterpret_cast<const StringValue*>(min_data);
+        filter->mutable_min_val()->set_stringval(
+                std::string(min_string_value->ptr, min_string_value->len));
+        const StringValue* max_string_value = reinterpret_cast<const StringValue*>(max_data);
+        filter->mutable_max_val()->set_stringval(
+                std::string(max_string_value->ptr, max_string_value->len));
+        break;
+    }
+    default: {
+        DCHECK(false) << "unknown type";
+        break;
+    }
+    }
+}
+
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+    std::unique_ptr<RuntimePredicateWrapper> wrapper;
+    RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _mem_tracker, _pool, &wrapper));
+    RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
+    this->signal();
+    return Status::OK();
+}
+
+Status IRuntimeFilter::consumer_close() {
+    DCHECK(is_consumer());
+    Expr::close(_push_down_ctxs, _state);
+    return Status::OK();
+}
+
+RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
+RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
+
+Status RuntimeFilterSlots::init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
+                                int64_t hash_table_size) {
+    DCHECK(_probe_expr_context.size() == _build_expr_context.size());
+
+    // runtime filter effect stragety
+    // 1. we will ignore IN filter when hash_table_size is too big
+    // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size
+    // is too small and IN filter has effect
+
+    std::map<int, bool> has_in_filter;
+
+    auto ignore_filter = [state](int filter_id) {
+        IRuntimeFilter* consumer_filter = nullptr;
+        state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
+        DCHECK(consumer_filter != nullptr);
+        consumer_filter->set_ignored();
+        consumer_filter->signal();
+    };
+
+    for (auto& filter_desc : _runtime_filter_descs) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
+                                                                         &runtime_filter));
+        DCHECK(runtime_filter != nullptr);
+        DCHECK(runtime_filter->expr_order() >= 0);
+        DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
+
+        if (runtime_filter->type() == RuntimeFilterType::IN_FILTER &&
+            hash_table_size >= state->runtime_filter_max_in_num()) {
+            ignore_filter(filter_desc.filter_id);
+            continue;
+        }
+        if (has_in_filter[runtime_filter->expr_order()] && !runtime_filter->has_remote_target() &&
+            runtime_filter->type() != RuntimeFilterType::IN_FILTER &&
+            hash_table_size < state->runtime_filter_max_in_num()) {
+            ignore_filter(filter_desc.filter_id);
+            continue;
+        }
+        has_in_filter[runtime_filter->expr_order()] =
+                (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+        _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
+    }
+
+    return Status::OK();
+}
+
+void RuntimeFilterSlots::ready_for_publish() {
+    for (auto& pair : _runtime_filters) {
+        for (auto filter : pair.second) {
+            filter->ready_for_publish();
+        }
+    }
+}
+
+void RuntimeFilterSlots::publish(HashJoinNode* hash_join_node) {
+    for (int i = 0; i < _probe_expr_context.size(); ++i) {
+        auto iter = _runtime_filters.find(i);
+        if (iter != _runtime_filters.end()) {
+            for (auto filter : iter->second) {
+                filter->publish(hash_join_node, _probe_expr_context[i]);
+            }
+        }
+    }
+    for (auto& pair : _runtime_filters) {
+        for (auto filter : pair.second) {
+            filter->publish_finally();
+        }
+    }
+}
+
+} // namespace doris
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
new file mode 100644
index 0000000..61ef65d
--- /dev/null
+++ b/be/src/exprs/runtime_filter.h
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_QUERY_EXPRS_RUNTIME_PREDICATE_H
+#define DORIS_BE_SRC_QUERY_EXPRS_RUNTIME_PREDICATE_H
+
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+
+#include "exprs/expr_context.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/types.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
+
+namespace doris {
+class Predicate;
+class ObjectPool;
+class ExprContext;
+class RuntimeState;
+class RuntimePredicateWrapper;
+class MemTracker;
+class TupleRow;
+class PPublishFilterRequest;
+class PMergeFilterRequest;
+class TRuntimeFilterDesc;
+class RowDescriptor;
+class PMinMaxFilter;
+class HashJoinNode;
+class RuntimeProfile;
+
+enum class RuntimeFilterType {
+    UNKNOWN_FILTER = -1,
+    IN_FILTER = 0,
+    MINMAX_FILTER = 1,
+    BLOOM_FILTER = 2
+};
+
+inline std::string to_string(RuntimeFilterType type) {
+    switch (type) {
+    case RuntimeFilterType::IN_FILTER: {
+        return std::string("in");
+    }
+    case RuntimeFilterType::BLOOM_FILTER: {
+        return std::string("bloomfilter");
+    }
+    case RuntimeFilterType::MINMAX_FILTER: {
+        return std::string("minmax");
+    }
+    default:
+        return std::string("UNKNOWN");
+    }
+}
+
+enum class RuntimeFilterRole { PRODUCER = 0, CONSUMER = 1 };
+
+struct RuntimeFilterParams {
+    RuntimeFilterParams() : filter_type(RuntimeFilterType::UNKNOWN_FILTER), bloom_filter_size(-1) {}
+
+    RuntimeFilterType filter_type;
+    PrimitiveType column_return_type;
+    // used in bloom filter
+    int64_t bloom_filter_size;
+};
+
+struct UpdateRuntimeFilterParams {
+    const PPublishFilterRequest* request;
+    const char* data;
+};
+
+struct MergeRuntimeFilterParams {
+    const PMergeFilterRequest* request;
+    const char* data;
+};
+
+/// The runtimefilter is built in the join node.
+/// The main purpose is to reduce the scanning amount of the
+/// left table data according to the scanning results of the right table during the join process.
+/// The runtimefilter will build some filter conditions.
+/// that can be pushed down to node based on the results of the right table.
+class IRuntimeFilter {
+public:
+    IRuntimeFilter(RuntimeState* state, MemTracker* mem_tracker, ObjectPool* pool)
+            : _state(state),
+              _mem_tracker(mem_tracker),
+              _pool(pool),
+              _runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER),
+              _filter_id(-1),
+              _is_broadcast_join(true),
+              _has_remote_target(false),
+              _has_local_target(false),
+              _is_ready(false),
+              _role(RuntimeFilterRole::PRODUCER),
+              _expr_order(-1),
+              _always_true(false),
+              _probe_ctx(nullptr),
+              _is_ignored(false) {}
+
+    ~IRuntimeFilter() = default;
+
+    static Status create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+                         const TRuntimeFilterDesc* desc, const RuntimeFilterRole role, int node_id,
+                         IRuntimeFilter** res);
+
+    // insert data to build filter
+    // only used for producer
+    void insert(void* data);
+
+    // publish filter
+    // push filter to remote node or push down it to scan_node
+    Status publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx);
+
+    void publish_finally();
+
+    RuntimeFilterType type() const { return _runtime_filter_type; }
+
+    // get push down expr context
+    // This function can only be called once
+    // _wrapper's function will be clear
+    // only consumer could call this
+    Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
+
+    // This function is used by UT and producer
+    Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs, ExprContext* probe_ctx);
+
+    // This function can be called multiple times
+    Status get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
+                                const RowDescriptor& desc,
+                                const std::shared_ptr<MemTracker>& tracker);
+
+    bool is_broadcast_join() const { return _is_broadcast_join; }
+
+    bool has_remote_target() const { return _has_remote_target; }
+
+    bool is_ready() const { return _is_ready; }
+
+    bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
+    bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; }
+    void set_role(const RuntimeFilterRole role) { _role = role; }
+    int expr_order() { return _expr_order; }
+
+    // only used for consumer
+    // if filter is not ready for filter data scan_node
+    // will wait util it ready or timeout
+    // This function will wait at most config::runtime_filter_shuffle_wait_time_ms
+    // if return true , filter is ready to use
+    bool await();
+    // this function will be called if a runtime filter sent by rpc
+    // it will nodify all wait threads
+    void signal();
+
+    // init filter with desc
+    Status init_with_desc(const TRuntimeFilterDesc* desc, int node_id = -1);
+
+    // serialize _wrapper to protobuf
+    Status serialize(PMergeFilterRequest* request, void** data, int* len);
+    Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
+
+    Status merge_from(const RuntimePredicateWrapper* wrapper);
+
+    static Status create_wrapper(const MergeRuntimeFilterParams* param, MemTracker* tracker,
+                                 ObjectPool* pool,
+                                 std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+    static Status create_wrapper(const UpdateRuntimeFilterParams* param, MemTracker* tracker,
+                                 ObjectPool* pool,
+                                 std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+
+    Status update_filter(const UpdateRuntimeFilterParams* param);
+
+    void set_ignored() { _is_ignored = true; }
+
+    // consumer should call before released
+    Status consumer_close();
+
+    // async push runtimefilter to remote node
+    Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
+    Status join_rpc();
+
+    void init_profile(RuntimeProfile* parent_profile);
+
+    void set_push_down_profile();
+
+    void ready_for_publish();
+
+protected:
+    // serialize _wrapper to protobuf
+    void to_protobuf(PMinMaxFilter* filter);
+
+    template <class T>
+    Status _serialize(T* request, void** data, int* len);
+
+    template <class T>
+    static Status _create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool,
+                                  std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+
+protected:
+    RuntimeState* _state;
+    MemTracker* _mem_tracker;
+    ObjectPool* _pool;
+    // _wrapper is a runtime filter function wrapper
+    // _wrapper should alloc from _pool
+    RuntimePredicateWrapper* _wrapper;
+    // runtime filter type
+    RuntimeFilterType _runtime_filter_type;
+    // runtime filter id
+    int _filter_id;
+    // Specific types BoardCast or Shuffle
+    bool _is_broadcast_join;
+    // will apply to remote node
+    bool _has_remote_target;
+    // will apply to local node
+    bool _has_local_target;
+    // filter is ready for consumer
+    bool _is_ready;
+    // role consumer or producer
+    RuntimeFilterRole _role;
+    // expr index
+    int _expr_order;
+    // used for await or signal
+    std::mutex _inner_mutex;
+    std::condition_variable _inner_cv;
+
+    // if set always_true = true
+    // this filter won't filter any data
+    bool _always_true;
+
+    // build expr_context
+    // ExprContext* _build_ctx;
+    // probe expr_context
+    // it only used in consumer to generate runtime_filter expr_context
+    // we don't have to prepare it or close it
+    ExprContext* _probe_ctx;
+
+    // Indicate whether runtime filter expr has been ignored
+    bool _is_ignored;
+
+    // some runtime filter will generate
+    // multiple contexts such as minmax filter
+    // these context is called prepared by this,
+    // consumer_close should be called before release
+    std::vector<ExprContext*> _push_down_ctxs;
+
+    struct rpc_context;
+    std::shared_ptr<rpc_context> _rpc_context;
+
+    // parent profile
+    // only effect on consumer
+    std::unique_ptr<RuntimeProfile> _profile;
+    // unix millis
+    RuntimeProfile::Counter* _await_time_cost = nullptr;
+    RuntimeProfile::Counter* _effect_time_cost = nullptr;
+    std::unique_ptr<ScopedTimer<MonotonicStopWatch>> _effect_timer;
+};
+
+// avoid expose RuntimePredicateWrapper
+class RuntimeFilterWrapperHolder {
+public:
+    using WrapperPtr = std::unique_ptr<RuntimePredicateWrapper>;
+    RuntimeFilterWrapperHolder();
+    ~RuntimeFilterWrapperHolder();
+    WrapperPtr* getHandle() { return &_wrapper; }
+
+private:
+    WrapperPtr _wrapper;
+};
+
+/// this class used in a hash join node
+/// Provide a unified interface for other classes
+class RuntimeFilterSlots {
+public:
+    RuntimeFilterSlots(const std::vector<ExprContext*>& prob_expr_ctxs,
+                       const std::vector<ExprContext*>& build_expr_ctxs,
+                       const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
+            : _probe_expr_context(prob_expr_ctxs),
+              _build_expr_context(build_expr_ctxs),
+              _runtime_filter_descs(runtime_filter_descs) {}
+
+    Status init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
+                int64_t hash_table_size);
+
+    void insert(TupleRow* row) {
+        for (int i = 0; i < _build_expr_context.size(); ++i) {
+            auto iter = _runtime_filters.find(i);
+            if (iter != _runtime_filters.end()) {
+                void* val = _build_expr_context[i]->get_value(row);
+                for (auto filter : iter->second) {
+                    filter->insert(val);
+                }
+            }
+        }
+    }
+
+    // should call this method after insert
+    void ready_for_publish();
+    // publish runtime filter
+    void publish(HashJoinNode* hash_join_node);
+
+private:
+    const std::vector<ExprContext*>& _probe_expr_context;
+    const std::vector<ExprContext*>& _build_expr_context;
+    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+    // prob_contition index -> [IRuntimeFilter]
+    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
+};
+
+} // namespace doris
+
+#endif
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
new file mode 100644
index 0000000..87bb579
--- /dev/null
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exprs/runtime_filter.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+
+// for rpc
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+
+namespace doris {
+
+struct IRuntimeFilter::rpc_context {
+    PMergeFilterRequest request;
+    PMergeFilterResponse response;
+    brpc::Controller cntl;
+    brpc::CallId cid;
+};
+
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
+    DCHECK(is_producer());
+    DCHECK(_rpc_context == nullptr);
+    PBackendService_Stub* stub = state->exec_env()->brpc_stub_cache()->get_stub(*addr);
+    _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
+    void* data = nullptr;
+    int len = 0;
+
+    auto pquery_id = _rpc_context->request.mutable_query_id();
+    pquery_id->set_hi(_state->query_id().hi);
+    pquery_id->set_lo(_state->query_id().lo);
+
+    auto pfragment_instance_id = _rpc_context->request.mutable_fragment_id();
+    pfragment_instance_id->set_hi(state->fragment_instance_id().hi);
+    pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
+
+    _rpc_context->request.set_filter_id(_filter_id);
+    _rpc_context->cntl.set_timeout_ms(1000);
+    _rpc_context->cid = _rpc_context->cntl.call_id();
+
+    Status serialize_status = serialize(&_rpc_context->request, &data, &len);
+    if (serialize_status.ok()) {
+        LOG(INFO) << "Producer:" << _rpc_context->request.ShortDebugString() << addr->hostname
+                  << ":" << addr->port;
+        if (len > 0) {
+            DCHECK(data != nullptr);
+            _rpc_context->cntl.request_attachment().append(data, len);
+        }
+        if (config::runtime_filter_use_async_rpc) {
+            stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+                               brpc::DoNothing());
+        } else {
+            stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+                               nullptr);
+            _rpc_context.reset();
+        }
+
+    } else {
+        // we should reset context
+        _rpc_context.reset();
+    }
+    return serialize_status;
+}
+
+Status IRuntimeFilter::join_rpc() {
+    DCHECK(is_producer());
+    if (_rpc_context != nullptr) {
+        brpc::Join(_rpc_context->cid);
+        if (_rpc_context->cntl.Failed()) {
+            LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
+        }
+    }
+    return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index ffa4112..7da0983 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -48,6 +48,7 @@ add_library(Olap STATIC
     generic_iterators.cpp
     hll.cpp
     in_list_predicate.cpp
+    bloom_filter_predicate.cpp
     in_stream.cpp
     key_coder.cpp
     lru_cache.cpp
diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp
new file mode 100644
index 0000000..06e4947
--- /dev/null
+++ b/be/src/olap/bloom_filter_predicate.cpp
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/bloom_filter_predicate.h"
+
+#include "olap/field.h"
+#include "runtime/string_value.hpp"
+#include "runtime/vectorized_row_batch.h"
+
+namespace doris {
+
+BloomFilterColumnPredicate::BloomFilterColumnPredicate(
+        uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter)
+        : ColumnPredicate(column_id), _filter(filter) {}
+
+// blomm filter column predicate do not support in segment v1
+void BloomFilterColumnPredicate::evaluate(VectorizedRowBatch* batch) const {
+    uint16_t n = batch->size();
+    uint16_t* sel = batch->selected();
+    if (!batch->selected_in_use()) {
+        for (uint16_t i = 0; i != n; ++i) {
+            sel[i] = i;
+        }
+    }
+}
+
+void BloomFilterColumnPredicate::evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const {
+    uint16_t new_size = 0;
+    if (block->is_nullable()) {
+        for (uint16_t i = 0; i < *size; ++i) {
+            uint16_t idx = sel[i];
+            sel[new_size] = idx;
+            const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+            new_size += (!block->cell(idx).is_null() && _filter->find_olap_engine(cell_value));
+        }
+    } else {
+        for (uint16_t i = 0; i < *size; ++i) {
+            uint16_t idx = sel[i];
+            sel[new_size] = idx;
+            const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+            new_size += _filter->find_olap_engine(cell_value);
+        }
+    }
+    *size = new_size;
+}
+
+Status BloomFilterColumnPredicate::evaluate(const Schema& schema,
+                                            const std::vector<BitmapIndexIterator*>& iterators,
+                                            uint32_t num_rows, Roaring* result) const {
+    return Status::OK();
+}
+
+} //namespace doris
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
new file mode 100644
index 0000000..191bfae
--- /dev/null
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
+#define DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
+
+#include <stdint.h>
+
+#include <roaring/roaring.hh>
+
+#include "exprs/bloomfilter_predicate.h"
+#include "olap/column_predicate.h"
+
+namespace doris {
+
+class VectorizedRowBatch;
+
+// only use in runtime filter and segment v2
+class BloomFilterColumnPredicate : public ColumnPredicate {
+public:
+    BloomFilterColumnPredicate(uint32_t column_id,
+                               const std::shared_ptr<BloomFilterFuncBase>& filter);
+    ~BloomFilterColumnPredicate() override = default;
+
+    void evaluate(VectorizedRowBatch* batch) const override;
+
+    void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override;
+
+    // Now BloomFilter not be a sub column predicate, so we not support OR and AND.
+    // It should be supported in the future.
+    void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,
+                     bool* flags) const override {};
+    void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,
+                      bool* flags) const override {};
+
+    Status evaluate(const Schema& schema, const vector<BitmapIndexIterator*>& iterators,
+                    uint32_t num_rows, Roaring* roaring) const override;
+
+private:
+    std::shared_ptr<BloomFilterFuncBase> _filter;
+};
+
+} //namespace doris
+
+#endif //DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
diff --git a/be/src/olap/in_list_predicate.cpp b/be/src/olap/in_list_predicate.cpp
index f4d2d98..35b7b34 100644
--- a/be/src/olap/in_list_predicate.cpp
+++ b/be/src/olap/in_list_predicate.cpp
@@ -23,9 +23,9 @@
 
 namespace doris {
 
-#define IN_LIST_PRED_CONSTRUCTOR(CLASS)                             \
-    template <class type>                                           \
-    CLASS<type>::CLASS(uint32_t column_id, std::set<type>&& values,  bool opposite) \
+#define IN_LIST_PRED_CONSTRUCTOR(CLASS)                                                      \
+    template <class type>                                                                    \
+    CLASS<type>::CLASS(uint32_t column_id, std::unordered_set<type>&& values, bool opposite) \
             : ColumnPredicate(column_id, opposite), _values(std::move(values)) {}
 
 IN_LIST_PRED_CONSTRUCTOR(InListPredicate)
@@ -96,8 +96,8 @@ IN_LIST_PRED_EVALUATE(NotInListPredicate, ==)
                 const type* cell_value =                                                  \
                         reinterpret_cast<const type*>(block->cell(idx).cell_ptr());       \
                 auto result = (!block->cell(idx).is_null() && _values.find(*cell_value)   \
-                                                                    OP _values.end());    \
-                new_size += _opposite ? !result : result;                              \
+                                                                      OP _values.end());  \
+                new_size += _opposite ? !result : result;                                 \
             }                                                                             \
         } else {                                                                          \
             for (uint16_t i = 0; i < *size; ++i) {                                        \
@@ -115,57 +115,59 @@ IN_LIST_PRED_EVALUATE(NotInListPredicate, ==)
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(InListPredicate, !=)
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(NotInListPredicate, ==)
 
-#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(CLASS, OP)                                     \
-    template <class type>                                                                 \
-    void CLASS<type>::evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const { \
-        if (block->is_nullable()) {                                                       \
-            for (uint16_t i = 0; i < size; ++i) {                                        \
-                if (flags[i]) continue;                                                   \
-                uint16_t idx = sel[i];                                                    \
-                const type* cell_value =                                                  \
-                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());       \
-                auto result = (!block->cell(idx).is_null() && _values.find(*cell_value)   \
-                                                                    OP _values.end());    \
-                flags[i] |= _opposite ? !result : result;                                 \
-            }                                                                             \
-        } else {                                                                          \
-            for (uint16_t i = 0; i < size; ++i) {                                         \
-                if (flags[i]) continue;                                                   \
-                uint16_t idx = sel[i];                                                    \
-                const type* cell_value =                                                  \
-                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());       \
-                auto result = (_values.find(*cell_value) OP _values.end());               \
-                flags[i] |= _opposite ? !result : result;                                 \
-            }                                                                             \
-        }                                                                                 \
+#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(CLASS, OP)                                         \
+    template <class type>                                                                        \
+    void CLASS<type>::evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) \
+            const {                                                                              \
+        if (block->is_nullable()) {                                                              \
+            for (uint16_t i = 0; i < size; ++i) {                                                \
+                if (flags[i]) continue;                                                          \
+                uint16_t idx = sel[i];                                                           \
+                const type* cell_value =                                                         \
+                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());              \
+                auto result = (!block->cell(idx).is_null() && _values.find(*cell_value)          \
+                                                                      OP _values.end());         \
+                flags[i] |= _opposite ? !result : result;                                        \
+            }                                                                                    \
+        } else {                                                                                 \
+            for (uint16_t i = 0; i < size; ++i) {                                                \
+                if (flags[i]) continue;                                                          \
+                uint16_t idx = sel[i];                                                           \
+                const type* cell_value =                                                         \
+                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());              \
+                auto result = (_values.find(*cell_value) OP _values.end());                      \
+                flags[i] |= _opposite ? !result : result;                                        \
+            }                                                                                    \
+        }                                                                                        \
     }
 
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(InListPredicate, !=)
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(NotInListPredicate, ==)
 
-#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(CLASS, OP)                                     \
-    template <class type>                                                                 \
-    void CLASS<type>::evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const { \
-        if (block->is_nullable()) {                                                       \
-            for (uint16_t i = 0; i < size; ++i) {                                        \
-                if (!flags[i]) continue;                                                   \
-                uint16_t idx = sel[i];                                                    \
-                const type* cell_value =                                                  \
-                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());       \
-                auto result = (!block->cell(idx).is_null() && _values.find(*cell_value)   \
-                                                                    OP _values.end());    \
-                flags[i] &= _opposite ? !result : result;                                 \
-            }                                                                             \
-        } else {                                                                          \
-            for (uint16_t i = 0; i < size; ++i) {                                        \
-                if (!flags[i]) continue;                                                   \
-                uint16_t idx = sel[i];                                                    \
-                const type* cell_value =                                                  \
-                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());       \
-                auto result = (_values.find(*cell_value) OP _values.end());               \
-                flags[i] &= _opposite ? !result : result;                                 \
-            }                                                                             \
-        }                                                                                 \
+#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(CLASS, OP)                                         \
+    template <class type>                                                                         \
+    void CLASS<type>::evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) \
+            const {                                                                               \
+        if (block->is_nullable()) {                                                               \
+            for (uint16_t i = 0; i < size; ++i) {                                                 \
+                if (!flags[i]) continue;                                                          \
+                uint16_t idx = sel[i];                                                            \
+                const type* cell_value =                                                          \
+                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());               \
+                auto result = (!block->cell(idx).is_null() && _values.find(*cell_value)           \
+                                                                      OP _values.end());          \
+                flags[i] &= _opposite ? !result : result;                                         \
+            }                                                                                     \
+        } else {                                                                                  \
+            for (uint16_t i = 0; i < size; ++i) {                                                 \
+                if (!flags[i]) continue;                                                          \
+                uint16_t idx = sel[i];                                                            \
+                const type* cell_value =                                                          \
+                        reinterpret_cast<const type*>(block->cell(idx).cell_ptr());               \
+                auto result = (_values.find(*cell_value) OP _values.end());                       \
+                flags[i] &= _opposite ? !result : result;                                         \
+            }                                                                                     \
+        }                                                                                         \
     }
 
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(InListPredicate, !=)
@@ -208,18 +210,29 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(NotInListPredicate, ==)
 IN_LIST_PRED_BITMAP_EVALUATE(InListPredicate, &=)
 IN_LIST_PRED_BITMAP_EVALUATE(NotInListPredicate, -=)
 
-#define IN_LIST_PRED_CONSTRUCTOR_DECLARATION(CLASS)                                                        \
-    template CLASS<int8_t>::CLASS(uint32_t column_id, std::set<int8_t>&& values, bool opposite);           \
-    template CLASS<int16_t>::CLASS(uint32_t column_id, std::set<int16_t>&& values, bool opposite);         \
-    template CLASS<int32_t>::CLASS(uint32_t column_id, std::set<int32_t>&& values, bool opposite);         \
-    template CLASS<int64_t>::CLASS(uint32_t column_id, std::set<int64_t>&& values, bool opposite);         \
-    template CLASS<int128_t>::CLASS(uint32_t column_id, std::set<int128_t>&& values, bool opposite);       \
-    template CLASS<float>::CLASS(uint32_t column_id, std::set<float>&& values, bool opposite);             \
-    template CLASS<double>::CLASS(uint32_t column_id, std::set<double>&& values, bool opposite);           \
-    template CLASS<decimal12_t>::CLASS(uint32_t column_id, std::set<decimal12_t>&& values, bool opposite); \
-    template CLASS<StringValue>::CLASS(uint32_t column_id, std::set<StringValue>&& values, bool opposite); \
-    template CLASS<uint24_t>::CLASS(uint32_t column_id, std::set<uint24_t>&& values, bool opposite);       \
-    template CLASS<uint64_t>::CLASS(uint32_t column_id, std::set<uint64_t>&& values, bool opposite);
+#define IN_LIST_PRED_CONSTRUCTOR_DECLARATION(CLASS)                                              \
+    template CLASS<int8_t>::CLASS(uint32_t column_id, std::unordered_set<int8_t>&& values,       \
+                                  bool opposite);                                                \
+    template CLASS<int16_t>::CLASS(uint32_t column_id, std::unordered_set<int16_t>&& values,     \
+                                   bool opposite);                                               \
+    template CLASS<int32_t>::CLASS(uint32_t column_id, std::unordered_set<int32_t>&& values,     \
+                                   bool opposite);                                               \
+    template CLASS<int64_t>::CLASS(uint32_t column_id, std::unordered_set<int64_t>&& values,     \
+                                   bool opposite);                                               \
+    template CLASS<int128_t>::CLASS(uint32_t column_id, std::unordered_set<int128_t>&& values,   \
+                                    bool opposite);                                              \
+    template CLASS<float>::CLASS(uint32_t column_id, std::unordered_set<float>&& values,         \
+                                 bool opposite);                                                 \
+    template CLASS<double>::CLASS(uint32_t column_id, std::unordered_set<double>&& values,       \
+                                  bool opposite);                                                \
+    template CLASS<decimal12_t>::CLASS(uint32_t column_id,                                       \
+                                       std::unordered_set<decimal12_t>&& values, bool opposite); \
+    template CLASS<StringValue>::CLASS(uint32_t column_id,                                       \
+                                       std::unordered_set<StringValue>&& values, bool opposite); \
+    template CLASS<uint24_t>::CLASS(uint32_t column_id, std::unordered_set<uint24_t>&& values,   \
+                                    bool opposite);                                              \
+    template CLASS<uint64_t>::CLASS(uint32_t column_id, std::unordered_set<uint64_t>&& values,   \
+                                    bool opposite);
 
 IN_LIST_PRED_CONSTRUCTOR_DECLARATION(InListPredicate)
 IN_LIST_PRED_CONSTRUCTOR_DECLARATION(NotInListPredicate)
diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h
index 8f96a04..b5bc73e 100644
--- a/be/src/olap/in_list_predicate.h
+++ b/be/src/olap/in_list_predicate.h
@@ -22,28 +22,78 @@
 
 #include <roaring/roaring.hh>
 #include <set>
+#include <unordered_set>
 
+#include "decimal12.h"
 #include "olap/column_predicate.h"
+#include "uint24.h"
+#include "util/murmur_hash3.h"
+
+namespace std {
+// for string value
+template <>
+struct hash<doris::StringValue> {
+    uint64_t operator()(const doris::StringValue& rhs) const { return hash_value(rhs); }
+};
+
+template <>
+struct equal_to<doris::StringValue> {
+    bool operator()(const doris::StringValue& lhs, const doris::StringValue& rhs) const {
+        return lhs == rhs;
+    }
+};
+// for decimal12_t
+template <>
+struct hash<doris::decimal12_t> {
+    int64_t operator()(const doris::decimal12_t& rhs) const {
+        return hash<int64_t>()(rhs.integer) ^ hash<int32_t>()(rhs.fraction);
+    }
+};
+
+template <>
+struct equal_to<doris::decimal12_t> {
+    bool operator()(const doris::decimal12_t& lhs, const doris::decimal12_t& rhs) const {
+        return lhs == rhs;
+    }
+};
+// for uint24_t
+template <>
+struct hash<doris::uint24_t> {
+    size_t operator()(const doris::uint24_t& rhs) const {
+        uint32_t val(rhs);
+        return hash<int>()(val);
+    }
+};
+
+template <>
+struct equal_to<doris::uint24_t> {
+    bool operator()(const doris::uint24_t& lhs, const doris::uint24_t& rhs) const {
+        return lhs == rhs;
+    }
+};
+} // namespace std
 
 namespace doris {
 
 class VectorizedRowBatch;
 
-#define IN_LIST_PRED_CLASS_DEFINE(CLASS)                                                 \
-    template <class type>                                                                \
-    class CLASS : public ColumnPredicate {                                               \
-    public:                                                                              \
-        CLASS(uint32_t column_id, std::set<type>&& values, bool is_opposite = false);    \
-        virtual void evaluate(VectorizedRowBatch* batch) const override;                 \
-        void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override; \
-        void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const override;\
-        void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const override;\
-        virtual Status evaluate(const Schema& schema,                                    \
-                                const std::vector<BitmapIndexIterator*>& iterators,      \
-                                uint32_t num_rows, Roaring* bitmap) const override;      \
-                                                                                         \
-    private:                                                                             \
-        std::set<type> _values;                                                          \
+#define IN_LIST_PRED_CLASS_DEFINE(CLASS)                                                        \
+    template <class type>                                                                       \
+    class CLASS : public ColumnPredicate {                                                      \
+    public:                                                                                     \
+        CLASS(uint32_t column_id, std::unordered_set<type>&& values, bool is_opposite = false); \
+        virtual void evaluate(VectorizedRowBatch* batch) const override;                        \
+        void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override;        \
+        void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,                      \
+                         bool* flags) const override;                                           \
+        void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,                     \
+                          bool* flags) const override;                                          \
+        virtual Status evaluate(const Schema& schema,                                           \
+                                const std::vector<BitmapIndexIterator*>& iterators,             \
+                                uint32_t num_rows, Roaring* bitmap) const override;             \
+                                                                                                \
+    private:                                                                                    \
+        std::unordered_set<type> _values;                                                       \
     };
 
 IN_LIST_PRED_CLASS_DEFINE(InListPredicate)
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index ce88f23..23ce803 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -19,7 +19,9 @@
 
 #include <boost/algorithm/string/case_conv.hpp>
 #include <sstream>
+#include <unordered_set>
 
+#include "olap/bloom_filter_predicate.h"
 #include "olap/collect_iterator.h"
 #include "olap/comparison_predicate.h"
 #include "olap/in_list_predicate.h"
@@ -27,8 +29,8 @@
 #include "olap/row.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/column_data.h"
 #include "olap/rowset/beta_rowset_reader.h"
+#include "olap/rowset/column_data.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "runtime/mem_pool.h"
@@ -94,8 +96,7 @@ std::string Reader::KeysParam::to_string() const {
     return ss.str();
 }
 
-Reader::Reader() : _collect_iter(new CollectIterator()) {
-}
+Reader::Reader() : _collect_iter(new CollectIterator()) {}
 
 Reader::~Reader() {
     close();
@@ -339,8 +340,8 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params,
         if (_keys_param.range == "gt") {
             if (end_key != nullptr && compare_row_key(*start_key, *end_key) >= 0) {
                 VLOG_NOTICE << "return EOF when range=" << _keys_param.range
-                        << ", start_key=" << start_key->to_string()
-                        << ", end_key=" << end_key->to_string();
+                            << ", start_key=" << start_key->to_string()
+                            << ", end_key=" << end_key->to_string();
                 eof = true;
                 break;
             }
@@ -348,8 +349,8 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params,
         } else if (_keys_param.range == "ge") {
             if (end_key != nullptr && compare_row_key(*start_key, *end_key) > 0) {
                 VLOG_NOTICE << "return EOF when range=" << _keys_param.range
-                        << ", start_key=" << start_key->to_string()
-                        << ", end_key=" << end_key->to_string();
+                            << ", start_key=" << start_key->to_string()
+                            << ", end_key=" << end_key->to_string();
                 eof = true;
                 break;
             }
@@ -610,6 +611,11 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
             }
         }
     }
+
+    // Only key column bloom filter will push down to storage engine
+    for (const auto& filter : read_params.bloom_filters) {
+        _col_predicates.emplace_back(_parse_to_predicate(filter));
+    }
 }
 
 #define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE)                                   \
@@ -710,6 +716,35 @@ COMPARISON_PREDICATE_CONDITION_VALUE(le, LessEqualPredicate)
 COMPARISON_PREDICATE_CONDITION_VALUE(gt, GreaterPredicate)
 COMPARISON_PREDICATE_CONDITION_VALUE(ge, GreaterEqualPredicate)
 
+ColumnPredicate* Reader::_parse_to_predicate(
+        const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
+    int32_t index = _tablet->field_index(bloom_filter.first);
+    if (index < 0) {
+        return nullptr;
+    }
+    const TabletColumn& column = _tablet->tablet_schema().column(index);
+    // Because FE regards CHAR as VARCHAR and Date as Datetime during query planning,
+    // but direct use of filter will result in incorrect results due to inconsistent data structures.
+    // We need to convert to the data structure corresponding to the storage engine.
+    std::shared_ptr<BloomFilterFuncBase> filter;
+    switch (column.type()) {
+    case OLAP_FIELD_TYPE_CHAR: {
+        filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
+                                                              TYPE_CHAR));
+        filter->light_copy(bloom_filter.second.get());
+        return new BloomFilterColumnPredicate(index, filter);
+    }
+    case OLAP_FIELD_TYPE_DATE: {
+        filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
+                                                              TYPE_DATE));
+        filter->light_copy(bloom_filter.second.get());
+        return new BloomFilterColumnPredicate(index, filter);
+    }
+    default:
+        return new BloomFilterColumnPredicate(index, bloom_filter.second);
+    }
+}
+
 ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool opposite) const {
     // TODO: not equal and not in predicate is not pushed down
     int32_t index = _tablet->field_index(condition.column_name);
@@ -720,9 +755,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
     const TabletColumn& column = _tablet->tablet_schema().column(index);
     ColumnPredicate* predicate = nullptr;
 
-    if ((condition.condition_op == "*=" || condition.condition_op == "!*=" || condition.condition_op == "=" || condition.condition_op == "!=") && condition.condition_values.size() == 1) {
-        predicate = condition.condition_op == "*=" || condition.condition_op == "=" ? _new_eq_pred(column, index, condition.condition_values[0], opposite) :
-                _new_ne_pred(column, index, condition.condition_values[0], opposite);
+    if ((condition.condition_op == "*=" || condition.condition_op == "!*=" ||
+         condition.condition_op == "=" || condition.condition_op == "!=") &&
+        condition.condition_values.size() == 1) {
+        predicate = condition.condition_op == "*=" || condition.condition_op == "="
+                            ? _new_eq_pred(column, index, condition.condition_values[0], opposite)
+                            : _new_ne_pred(column, index, condition.condition_values[0], opposite);
     } else if (condition.condition_op == "<<") {
         predicate = _new_lt_pred(column, index, condition.condition_values[0], opposite);
     } else if (condition.condition_op == "<=") {
@@ -731,10 +769,11 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
         predicate = _new_gt_pred(column, index, condition.condition_values[0], opposite);
     } else if (condition.condition_op == ">=") {
         predicate = _new_ge_pred(column, index, condition.condition_values[0], opposite);
-    } else if ((condition.condition_op == "*=" || condition.condition_op == "!*=") && condition.condition_values.size() > 1) {
+    } else if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
+               condition.condition_values.size() > 1) {
         switch (column.type()) {
         case OLAP_FIELD_TYPE_TINYINT: {
-            std::set<int8_t> values;
+            std::unordered_set<int8_t> values;
             for (auto& cond_val : condition.condition_values) {
                 int32_t value = 0;
                 std::stringstream ss(cond_val);
@@ -744,12 +783,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             if (condition.condition_op == "*=") {
                 predicate = new InListPredicate<int8_t>(index, std::move(values), opposite);
             } else {
-                predicate = new NotInListPredicate<int8_t>(index, std::move(values),opposite);
+                predicate = new NotInListPredicate<int8_t>(index, std::move(values), opposite);
             }
             break;
         }
         case OLAP_FIELD_TYPE_SMALLINT: {
-            std::set<int16_t> values;
+            std::unordered_set<int16_t> values;
             for (auto& cond_val : condition.condition_values) {
                 int16_t value = 0;
                 std::stringstream ss(cond_val);
@@ -764,7 +803,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_INT: {
-            std::set<int32_t> values;
+            std::unordered_set<int32_t> values;
             for (auto& cond_val : condition.condition_values) {
                 int32_t value = 0;
                 std::stringstream ss(cond_val);
@@ -779,7 +818,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_BIGINT: {
-            std::set<int64_t> values;
+            std::unordered_set<int64_t> values;
             for (auto& cond_val : condition.condition_values) {
                 int64_t value = 0;
                 std::stringstream ss(cond_val);
@@ -794,7 +833,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_LARGEINT: {
-            std::set<int128_t> values;
+            std::unordered_set<int128_t> values;
             for (auto& cond_val : condition.condition_values) {
                 int128_t value = 0;
                 std::stringstream ss(cond_val);
@@ -809,7 +848,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_DECIMAL: {
-            std::set<decimal12_t> values;
+            std::unordered_set<decimal12_t> values;
             for (auto& cond_val : condition.condition_values) {
                 decimal12_t value = {0, 0};
                 value.from_string(cond_val);
@@ -823,7 +862,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_CHAR: {
-            std::set<StringValue> values;
+            std::unordered_set<StringValue> values;
             for (auto& cond_val : condition.condition_values) {
                 StringValue value;
                 size_t length = std::max(static_cast<size_t>(column.length()), cond_val.length());
@@ -842,7 +881,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_VARCHAR: {
-            std::set<StringValue> values;
+            std::unordered_set<StringValue> values;
             for (auto& cond_val : condition.condition_values) {
                 StringValue value;
                 int32_t length = cond_val.length();
@@ -860,7 +899,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_DATE: {
-            std::set<uint24_t> values;
+            std::unordered_set<uint24_t> values;
             for (auto& cond_val : condition.condition_values) {
                 uint24_t value = timestamp_from_date(cond_val);
                 values.insert(value);
@@ -873,7 +912,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             break;
         }
         case OLAP_FIELD_TYPE_DATETIME: {
-            std::set<uint64_t> values;
+            std::unordered_set<uint64_t> values;
             for (auto& cond_val : condition.condition_values) {
                 uint64_t value = timestamp_from_datetime(cond_val);
                 values.insert(value);
@@ -887,10 +926,11 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
         }
         // OLAP_FIELD_TYPE_BOOL is not valid in this case.
         default:
-           break;
+            break;
         }
     } else if (boost::to_lower_copy(condition.condition_op) == "is") {
-        predicate = new NullPredicate(index, boost::to_lower_copy(condition.condition_values[0]) == "null", opposite);
+        predicate = new NullPredicate(
+                index, boost::to_lower_copy(condition.condition_values[0]) == "null", opposite);
     }
     return predicate;
 }
@@ -953,8 +993,8 @@ OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
     }
 
     _tablet->obtain_header_rdlock();
-    OLAPStatus ret = _delete_handler.init(
-            _tablet->tablet_schema(), _tablet->delete_predicates(), read_params.version.second, this);
+    OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
+                                          read_params.version.second, this);
     _tablet->release_header_lock();
 
     if (read_params.reader_type == READER_BASE_COMPACTION) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 766f8ec..72d5588 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -30,6 +30,7 @@
 #include <utility>
 #include <vector>
 
+#include "exprs/bloomfilter_predicate.h"
 #include "olap/column_predicate.h"
 #include "olap/delete_handler.h"
 #include "olap/olap_cond.h"
@@ -67,7 +68,10 @@ struct ReaderParams {
     std::string end_range;
     std::vector<OlapTuple> start_key;
     std::vector<OlapTuple> end_key;
+
     std::vector<TCondition> conditions;
+    std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters;
+
     // The ColumnData will be set when using Merger, eg Cumulative, BE.
     std::vector<RowsetReaderSharedPtr> rs_readers;
     std::vector<uint32_t> return_columns;
@@ -101,7 +105,8 @@ public:
     uint64_t merged_rows() const { return _merged_rows; }
 
     uint64_t filtered_rows() const {
-        return _stats.rows_del_filtered + _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered;
+        return _stats.rows_del_filtered + _stats.rows_conditions_filtered +
+               _stats.rows_vec_del_cond_filtered;
     }
 
     const OlapReaderStatistics& stats() const { return _stats; }
@@ -124,7 +129,8 @@ private:
 
     OLAPStatus _init_params(const ReaderParams& read_params);
 
-    OLAPStatus _capture_rs_readers(const ReaderParams& read_params, std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
+    OLAPStatus _capture_rs_readers(const ReaderParams& read_params,
+                                   std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
 
     bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers);
 
@@ -132,15 +138,24 @@ private:
 
     void _init_conditions_param(const ReaderParams& read_params);
 
-    ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
-    ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
-    ColumnPredicate* _new_lt_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
-    ColumnPredicate* _new_le_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
-    ColumnPredicate* _new_gt_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
-    ColumnPredicate* _new_ge_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
+    ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
+    ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
+    ColumnPredicate* _new_lt_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
+    ColumnPredicate* _new_le_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
+    ColumnPredicate* _new_gt_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
+    ColumnPredicate* _new_ge_pred(const TabletColumn& column, int index, const std::string& cond,
+                                  bool opposite) const;
 
     ColumnPredicate* _parse_to_predicate(const TCondition& condition, bool opposite = false) const;
 
+    ColumnPredicate* _parse_to_predicate(
+            const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
+
     OLAPStatus _init_delete_condition(const ReaderParams& read_params);
 
     OLAPStatus _init_return_columns(const ReaderParams& read_params);
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.cpp b/be/src/olap/rowset/segment_v2/bloom_filter.cpp
index f12e3e3..d7396b4 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter.cpp
@@ -37,7 +37,7 @@ Status BloomFilter::create(BloomFilterAlgorithmPB algorithm, std::unique_ptr<Blo
     return Status::OK();
 }
 
-uint32_t BloomFilter::_optimal_bit_num(uint64_t n, double fpp) {
+uint32_t BloomFilter::optimal_bit_num(uint64_t n, double fpp) {
     // ref parquet bloom_filter branch(BlockSplitBloomFilter.java)
     uint32_t num_bits = -8 * (double)n / log(1 - pow(fpp, 1.0 / 8));
     uint32_t max_bits = MAXIMUM_BYTES << 3;
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.h b/be/src/olap/rowset/segment_v2/bloom_filter.h
index 4da15f1..33726aa 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter.h
+++ b/be/src/olap/rowset/segment_v2/bloom_filter.h
@@ -59,13 +59,16 @@ public:
 
     // for write
     Status init(uint64_t n, double fpp, HashStrategyPB strategy) {
+        return this->init(optimal_bit_num(n, fpp) / 8, strategy);
+    }
+
+    Status init(uint64_t filter_size, HashStrategyPB strategy) {
         if (strategy == HASH_MURMUR3_X64_64) {
             _hash_func = murmur_hash3_x64_64;
         } else {
             return Status::InvalidArgument(strings::Substitute("invalid strategy:$0", strategy));
         }
-        _num_bytes = _optimal_bit_num(n, fpp) / 8;
-        // make sure _num_bytes is power of 2
+        _num_bytes = filter_size;
         DCHECK((_num_bytes & (_num_bytes - 1)) == 0);
         _size = _num_bytes + 1;
         // reserve last byte for null flag
@@ -78,7 +81,7 @@ public:
 
     // for read
     // use deep copy to acquire the data
-    Status init(char* buf, uint32_t size, HashStrategyPB strategy) {
+    Status init(const char* buf, uint32_t size, HashStrategyPB strategy) {
         DCHECK(size > 1);
         if (strategy == HASH_MURMUR3_X64_64) {
             _hash_func = murmur_hash3_x64_64;
@@ -92,6 +95,7 @@ public:
         memcpy(_data, buf, size);
         _size = size;
         _num_bytes = _size - 1;
+        DCHECK((_num_bytes & (_num_bytes - 1)) == 0);
         _has_null = (bool*)(_data + _num_bytes);
         return Status::OK();
     }
@@ -134,13 +138,20 @@ public:
     virtual void add_hash(uint64_t hash) = 0;
     virtual bool test_hash(uint64_t hash) const = 0;
 
-private:
+    Status merge(const BloomFilter* other) {
+        DCHECK(other->size() == _size);
+        for (uint32_t i = 0; i < other->size(); i++) {
+            _data[i] |= other->_data[i];
+        }
+        return Status::OK();
+    }
+
     // Compute the optimal bit number according to the following rule:
     //     m = -n * ln(fpp) / (ln(2) ^ 2)
     // n: expected distinct record number
     // fpp: false positive probability
     // the result will be power of 2
-    uint32_t _optimal_bit_num(uint64_t n, double fpp);
+    static uint32_t optimal_bit_num(uint64_t n, double fpp);
 
 protected:
     // bloom filter data
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 47bea84..8f7bd9e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -44,6 +44,7 @@ set(RUNTIME_FILES
     result_writer.cpp
     row_batch.cpp
     runtime_state.cpp
+    runtime_filter_mgr.cpp
     string_value.cpp
     thread_resource_mgr.cpp
     #  timestamp_value.cpp
diff --git a/be/src/runtime/decimal_value.h b/be/src/runtime/decimal_value.h
index 54bbb51..1eb6f3a 100644
--- a/be/src/runtime/decimal_value.h
+++ b/be/src/runtime/decimal_value.h
@@ -23,6 +23,7 @@
 #include <cstdlib>
 #include <cstring>
 #include <iostream>
+#include <limits>
 #include <sstream>
 #include <string>
 #include <string_view>
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index da676aa..a7832f8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -39,6 +39,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
@@ -95,10 +96,17 @@ public:
 
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 
+    TUniqueId query_id() const { return _query_id; }
+
     PlanFragmentExecutor* executor() { return &_executor; }
 
     const DateTimeValue& start_time() const { return _start_time; }
 
+    void set_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+        _merge_controller_handler = handler;
+    }
+
     // Update status of this fragment execute
     Status update_status(Status status) {
         std::lock_guard<std::mutex> l(_status_lock);
@@ -156,6 +164,8 @@ private:
 
     // This context is shared by all fragments of this host in a query
     std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
+
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
 };
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -520,6 +530,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
                                                params.backend_num, _exec_env, fragments_ctx));
     }
 
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+    _runtimefilter_controller.add_entity(params, &handler);
+    exec_state->set_merge_controller_handler(handler);
+
     RETURN_IF_ERROR(exec_state->prepare(params));
     {
         std::lock_guard<std::mutex> lock(_lock);
@@ -718,4 +732,34 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
     return exec_plan_fragment(exec_fragment_params);
 }
 
+Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
+    UniqueId fragment_instance_id = request->fragment_id();
+    TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+    std::shared_ptr<FragmentExecState> fragment_state;
+
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _fragment_map.find(tfragment_instance_id);
+        if (iter == _fragment_map.end()) {
+            LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
+            return Status::InvalidArgument("fragment-id");
+        }
+        fragment_state = iter->second;
+    }
+    DCHECK(fragment_state != nullptr);
+    RuntimeFilterMgr* runtime_filter_mgr =
+            fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+
+    Status st = runtime_filter_mgr->update_filter(request, data);
+    return st;
+}
+
+Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
+    RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 7d16074..a6329cb 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -31,6 +31,7 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/ref_counted.h"
 #include "http/rest_monitor_iface.h"
+#include "runtime_filter_mgr.h"
 #include "util/countdown_latch.h"
 #include "util/hash_util.hpp"
 #include "util/metrics.h"
@@ -46,6 +47,7 @@ class ThreadPool;
 class TExecPlanFragmentParams;
 class TExecPlanFragmentParamsList;
 class TUniqueId;
+class RuntimeFilterMergeController;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
@@ -80,6 +82,12 @@ public:
                                        const TUniqueId& fragment_instance_id,
                                        std::vector<TScanColumnDesc>* selected_columns);
 
+    RuntimeFilterMergeController& runtimefilter_controller() { return _runtimefilter_controller; }
+
+    Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
+
+    Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
+
 private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb);
 
@@ -100,6 +108,8 @@ private:
 
     std::shared_ptr<MetricEntity> _entity = nullptr;
     UIntGauge* timeout_canceled_fragment_count = nullptr;
+
+    RuntimeFilterMergeController _runtimefilter_controller;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
new file mode 100644
index 0000000..58cec23
--- /dev/null
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -0,0 +1,304 @@
+#include "runtime/runtime_filter_mgr.h"
+
+#include <string>
+
+#include "client_cache.h"
+#include "exprs/runtime_filter.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+#include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+#include "util/time.h"
+
+namespace doris {
+
+template <class RPCRequest, class RPCResponse>
+struct async_rpc_context {
+    RPCRequest request;
+    RPCResponse response;
+    brpc::Controller cntl;
+    brpc::CallId cid;
+};
+
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state) : _state(state) {}
+
+RuntimeFilterMgr::~RuntimeFilterMgr() {}
+
+Status RuntimeFilterMgr::init() {
+    DCHECK(_state->instance_mem_tracker().get() != nullptr);
+    _tracker = _state->instance_mem_tracker().get();
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
+                                            IRuntimeFilter** target) {
+    int32_t key = filter_id;
+    std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
+
+    if (role == RuntimeFilterRole::CONSUMER) {
+        filter_map = &_consumer_map;
+    } else {
+        filter_map = &_producer_map;
+    }
+
+    auto iter = filter_map->find(key);
+    if (iter == filter_map->end()) {
+        LOG(WARNING) << "unknown filter...:" << key << ",role:" << (int)role;
+        return Status::InvalidArgument("unknown filter");
+    }
+    *target = iter->second.filter;
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter) {
+    return get_filter_by_role(filter_id, RuntimeFilterRole::CONSUMER, consumer_filter);
+}
+
+Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
+                                             IRuntimeFilter** producer_filter) {
+    return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter);
+}
+
+Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
+                                       int node_id) {
+    DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
+           role != RuntimeFilterRole::CONSUMER);
+    int32_t key = desc.filter_id;
+
+    std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
+    if (role == RuntimeFilterRole::CONSUMER) {
+        filter_map = &_consumer_map;
+    } else {
+        filter_map = &_producer_map;
+    }
+    // LOG(INFO) << "regist filter...:" << key << ",role:" << role;
+
+    auto iter = filter_map->find(key);
+    if (iter != filter_map->end()) {
+        return Status::InvalidArgument("filter has registed");
+    }
+
+    RuntimeFilterMgrVal filter_mgr_val;
+    filter_mgr_val.role = role;
+
+    RETURN_IF_ERROR(IRuntimeFilter::create(_state, _tracker, &_pool, &desc, role, node_id,
+                                           &filter_mgr_val.filter));
+
+    filter_map->emplace(key, filter_mgr_val);
+
+    return Status::OK();
+}
+
+Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+    UpdateRuntimeFilterParams params;
+    params.request = request;
+    params.data = data;
+    int filter_id = request->filter_id();
+    IRuntimeFilter* real_filter = nullptr;
+    RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
+    return real_filter->update_filter(&params);
+}
+
+void RuntimeFilterMgr::set_runtime_filter_params(
+        const TRuntimeFilterParams& runtime_filter_params) {
+    this->_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
+    this->_has_merge_addr = true;
+}
+
+Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) {
+    DCHECK(_has_merge_addr);
+    if (_has_merge_addr) {
+        *addr = this->_merge_addr;
+        return Status::OK();
+    }
+    return Status::InternalError("not found merge addr");
+}
+
+Status RuntimeFilterMergeControllerEntity::_init_with_desc(
+        const TRuntimeFilterDesc* runtime_filter_desc,
+        const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+        const int producer_size) {
+    std::lock_guard<std::mutex> guard(_filter_map_mutex);
+    std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>();
+    // runtime_filter_desc and target will be released,
+    // so we need to copy to cntVal
+    // TODO: tracker should add a name
+    cntVal->producer_size = producer_size;
+    cntVal->runtime_filter_desc = *runtime_filter_desc;
+    cntVal->target_info = *target_info;
+    cntVal->pool.reset(new ObjectPool());
+    cntVal->tracker = MemTracker::CreateTracker();
+    cntVal->filter = cntVal->pool->add(
+            new IRuntimeFilter(nullptr, cntVal->tracker.get(), cntVal->pool.get()));
+
+    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+    // LOG(INFO) << "entity filter id:" << filter_id;
+    cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc);
+    _filter_map.emplace(filter_id, cntVal);
+    return Status::OK();
+}
+
+Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
+                                                const TRuntimeFilterParams& runtime_filter_params) {
+    _query_id = query_id;
+    for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+        int filter_id = filterid_to_desc.first;
+        const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+        if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+            return Status::InternalError("runtime filter params meet error");
+        }
+        const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+        if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+            return Status::InternalError("runtime filter params meet error");
+        }
+        _init_with_desc(&filterid_to_desc.second, &target_iter->second, build_iter->second);
+    }
+    return Status::OK();
+}
+
+// merge data
+Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
+                                                 const char* data) {
+    std::shared_ptr<RuntimeFilterCntlVal> cntVal;
+    int merged_size = 0;
+    {
+        std::lock_guard<std::mutex> guard(_filter_map_mutex);
+        auto iter = _filter_map.find(std::to_string(request->filter_id()));
+        VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
+        if (iter == _filter_map.end()) {
+            LOG(WARNING) << "unknown filter id:" << std::to_string(request->filter_id());
+            return Status::InvalidArgument("unknown filter id");
+        }
+        cntVal = iter->second;
+        MergeRuntimeFilterParams params;
+        params.data = data;
+        params.request = request;
+        std::shared_ptr<MemTracker> tracker = iter->second->tracker;
+        ObjectPool* pool = iter->second->pool.get();
+        RuntimeFilterWrapperHolder holder;
+        RETURN_IF_ERROR(
+                IRuntimeFilter::create_wrapper(&params, tracker.get(), pool, holder.getHandle()));
+        RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
+        cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string());
+        merged_size = cntVal->arrive_id.size();
+        // TODO: avoid log when we had acquired a lock
+        VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
+        DCHECK_LE(merged_size, cntVal->producer_size);
+        if (merged_size < cntVal->producer_size) {
+            return Status::OK();
+        }
+    }
+
+    if (merged_size == cntVal->producer_size) {
+        // prepare rpc context
+        using PPublishFilterRpcContext =
+                async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
+        std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
+        rpc_contexts.reserve(cntVal->target_info.size());
+
+        butil::IOBuf request_attachment;
+
+        PPublishFilterRequest apply_request;
+        // serialize filter
+        void* data = nullptr;
+        int len = 0;
+        bool has_attachment = false;
+        RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len));
+        if (data != nullptr && len > 0) {
+            request_attachment.append(data, len);
+            has_attachment = true;
+        }
+
+        // TODO: async send publish rpc
+        std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
+        for (size_t i = 0; i < targets.size(); i++) {
+            rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+            rpc_contexts[i]->request = apply_request;
+            rpc_contexts[i]->request.set_filter_id(request->filter_id());
+            *rpc_contexts[i]->request.mutable_query_id() = request->query_id();
+            if (has_attachment) {
+                rpc_contexts[i]->cntl.request_attachment().append(request_attachment);
+            }
+
+            // set fragment-id
+            auto request_fragment_id = rpc_contexts[i]->request.mutable_fragment_id();
+            request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi);
+            request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo);
+
+            PBackendService_Stub* stub = ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
+                    targets[i].target_fragment_instance_addr);
+            LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id()
+                      << " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
+                      << targets[i].target_fragment_instance_addr.port
+                      << rpc_contexts[i]->request.ShortDebugString();
+            if (stub == nullptr) {
+                rpc_contexts.pop_back();
+                continue;
+            }
+            stub->apply_filter(&rpc_contexts[i]->cntl, &rpc_contexts[i]->request,
+                               &rpc_contexts[i]->response, nullptr);
+        }
+        /// TODO: use async and join rpc
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMergeController::add_entity(
+        const TExecPlanFragmentParams& params,
+        std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+    runtime_filter_merge_entity_closer entity_closer =
+            std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1);
+
+    std::lock_guard<std::mutex> guard(_controller_mutex);
+    UniqueId query_id(params.params.query_id);
+    std::string query_id_str = query_id.to_string();
+    auto iter = _filter_controller_map.find(query_id_str);
+
+    if (iter == _filter_controller_map.end()) {
+        *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
+                new RuntimeFilterMergeControllerEntity(), entity_closer);
+        _filter_controller_map[query_id_str] = *handle;
+        const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params;
+        if (params.params.__isset.runtime_filter_params) {
+            RETURN_IF_ERROR(handle->get()->init(query_id, filter_params));
+        }
+    } else {
+        *handle = _filter_controller_map[query_id_str].lock();
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMergeController::acquire(
+        UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+    std::lock_guard<std::mutex> guard(_controller_mutex);
+    std::string query_id_str = query_id.to_string();
+    auto iter = _filter_controller_map.find(query_id_str);
+    if (iter == _filter_controller_map.end()) {
+        LOG(WARNING) << "not found entity, query-id:" << query_id_str;
+        return Status::InvalidArgument("not found entity");
+    }
+    *handle = _filter_controller_map[query_id_str].lock();
+    if (*handle == nullptr) {
+        return Status::InvalidArgument("entity is closed");
+    }
+    return Status::OK();
+}
+
+Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
+    std::lock_guard<std::mutex> guard(_controller_mutex);
+    _filter_controller_map.erase(queryId.to_string());
+    return Status::OK();
+}
+
+// auto called while call ~std::shared_ptr<RuntimeFilterMergeControllerEntity>
+void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
+                                       RuntimeFilterMergeControllerEntity* entity) {
+    controller->remove_entity(entity->query_id());
+    delete entity;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
new file mode 100644
index 0000000..cfcbbec
--- /dev/null
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "exprs/runtime_filter.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+// defination for TRuntimeFilterDesc
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+
+namespace doris {
+class TUniqueId;
+class RuntimeFilter;
+class FragmentExecState;
+class PlanFragmentExecutor;
+class PPublishFilterRequest;
+class PMergeFilterRequest;
+
+/// producer:
+/// Filter filter;
+/// get_filter(filter_id, &filter);
+/// filter->merge(origin_filter)
+
+/// comsumer:
+/// get_filter(filter_id, &filter)
+/// filter->wait
+/// if filter->ready().ok(), use filter
+
+// owned by RuntimeState
+// RuntimeFilterMgr will be destoryed when RuntimeState is destoryed
+class RuntimeFilterMgr {
+public:
+    RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
+
+    ~RuntimeFilterMgr();
+
+    Status init();
+
+    // get a consumer filter by filter-id
+    Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
+
+    Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
+    // regist filter
+    Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
+                         int node_id = -1);
+
+    // update filter by remote
+    Status update_filter(const PPublishFilterRequest* request, const char* data);
+
+    void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
+
+    Status get_merge_addr(TNetworkAddress* addr);
+
+private:
+    Status get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
+                              IRuntimeFilter** target);
+
+    struct RuntimeFilterMgrVal {
+        RuntimeFilterRole role; // consumer or producer
+        IRuntimeFilter* filter;
+    };
+    // RuntimeFilterMgr is owned by RuntimeState, so we only
+    // use filter_id as key
+    // key: "filter-id"
+    /// TODO: should it need protected by a mutex?
+    std::map<int32_t, RuntimeFilterMgrVal> _consumer_map;
+    std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
+
+    RuntimeState* _state;
+    MemTracker* _tracker;
+    ObjectPool _pool;
+
+    TNetworkAddress _merge_addr;
+
+    bool _has_merge_addr;
+};
+
+// controller -> <query-id, entity>
+// RuntimeFilterMergeControllerEntity is the context used by runtimefilter for merging
+// During a query, only the last sink node owns this class, with the end of the query,
+// the class is destroyed with the last fragment_exec.
+class RuntimeFilterMergeControllerEntity {
+public:
+    RuntimeFilterMergeControllerEntity() : _query_id(0, 0) {}
+    ~RuntimeFilterMergeControllerEntity() = default;
+
+    Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params);
+
+    // handle merge rpc
+    Status merge(const PMergeFilterRequest* request, const char* data);
+
+    UniqueId query_id() { return _query_id; }
+
+private:
+    Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+                           const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+                           const int producer_size);
+
+    struct RuntimeFilterCntlVal {
+        int64_t create_time;
+        int producer_size;
+        TRuntimeFilterDesc runtime_filter_desc;
+        std::vector<doris::TRuntimeFilterTargetParams> target_info;
+        IRuntimeFilter* filter;
+        std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
+        std::shared_ptr<MemTracker> tracker;
+        std::shared_ptr<ObjectPool> pool;
+    };
+    UniqueId _query_id;
+    // protect _filter_map
+    std::mutex _filter_map_mutex;
+    // TODO: convert filter id to i32
+    // filter-id -> val
+    std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
+};
+
+// RuntimeFilterMergeController has a map query-id -> entity
+class RuntimeFilterMergeController {
+public:
+    RuntimeFilterMergeController() = default;
+    ~RuntimeFilterMergeController() = default;
+
+    // thread safe
+    // add a query-id -> entity
+    // If a query-id -> entity already exists
+    // add_entity will return a exists entity
+    Status add_entity(const TExecPlanFragmentParams& params,
+                      std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle);
+    // thread safe
+    // increate a reference count
+    // if a query-id is not exist
+    // Status.not_ok will be returned and a empty ptr will returned by *handle
+    Status acquire(UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle);
+
+    // thread safe
+    // remove a entity by query-id
+    // remove_entity will be called automatically by entity when entity is destroyed
+    Status remove_entity(UniqueId queryId);
+
+private:
+    std::mutex _controller_mutex;
+    // We store the weak pointer here.
+    // When the external object is destroyed, we need to clear this record
+    using FilterControllerMap =
+            std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+    // str(query-id) -> entity
+    FilterControllerMap _filter_controller_map;
+};
+
+using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>;
+
+void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
+                                       RuntimeFilterMergeControllerEntity* entity);
+
+} // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 5237646..441046e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -35,6 +35,7 @@
 #include "runtime/initial_reservations.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "util/cpu_info.h"
 #include "util/disk_info.h"
 #include "util/file_utils.h"
@@ -53,6 +54,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id,
         : _fragment_mem_tracker(nullptr),
           _profile("Fragment " + print_id(fragment_instance_id)),
           _obj_pool(new ObjectPool()),
+          _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _is_cancelled(false),
@@ -78,6 +80,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
         : _fragment_mem_tracker(nullptr),
           _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)),
           _obj_pool(new ObjectPool()),
+          _runtime_filter_mgr(new RuntimeFilterMgr(fragment_exec_params.query_id, this)),
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(fragment_exec_params.query_id),
@@ -93,6 +96,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
           _error_log_file_path(""),
           _error_log_file(nullptr),
           _instance_buffer_reservation(new ReservationTracker) {
+    if (fragment_exec_params.__isset.runtime_filter_params) {
+        _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+    }
     Status status =
             init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env);
     DCHECK(status.ok());
@@ -216,8 +222,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
     _query_mem_tracker =
             MemTracker::CreateTracker(bytes_limit, "RuntimeState:query:" + print_id(query_id),
                                       _exec_env->process_mem_tracker(), true, false);
-    _instance_mem_tracker = MemTracker::CreateTracker(
-            &_profile, -1, "RuntimeState:instance:", _query_mem_tracker);
+    _instance_mem_tracker =
+            MemTracker::CreateTracker(&_profile, -1, "RuntimeState:instance:", _query_mem_tracker);
 
     /*
     // TODO: this is a stopgap until we implement ExprContext
@@ -241,6 +247,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
                                                        std::numeric_limits<int64_t>::max());
     }
 
+    // filter manager depends _instance_mem_tracker
+    _runtime_filter_mgr->init();
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f62e63f..2152dfd 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -56,6 +56,7 @@ class LoadErrorHub;
 class ReservationTracker;
 class InitialReservations;
 class RowDescriptor;
+class RuntimeFilterMgr;
 
 // A collection of items that are part of the global state of a
 // query and shared across all execution nodes of that query.
@@ -341,6 +342,10 @@ public:
 
     bool enable_spill() const { return _query_options.enable_spilling; }
 
+    int32_t runtime_filter_wait_time_ms() { return _query_options.runtime_filter_wait_time_ms; }
+
+    int32_t runtime_filter_max_in_num() { return _query_options.runtime_filter_max_in_num; }
+
     bool enable_exchange_node_parallel_merge() const {
         return _query_options.enable_enable_exchange_node_parallel_merge;
     }
@@ -363,6 +368,8 @@ public:
     // if load mem limit is not set, or is zero, using query mem limit instead.
     int64_t get_load_mem_limit();
 
+    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
+
 private:
     // Use a custom block manager for the query for testing purposes.
     void set_block_mgr2(const boost::shared_ptr<BufferedBlockMgr2>& block_mgr) {
@@ -393,6 +400,9 @@ private:
     DescriptorTbl* _desc_tbl;
     std::shared_ptr<ObjectPool> _obj_pool;
 
+    // runtime filter
+    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
+
     // Protects _data_stream_recvrs_pool
     std::mutex _data_stream_recvrs_lock;
 
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 8023751..1141d7d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -105,8 +105,8 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-            auto st = _exec_env->load_channel_mgr()->add_batch(
-                    *request, response->mutable_tablet_vec());
+            auto st = _exec_env->load_channel_mgr()->add_batch(*request,
+                                                               response->mutable_tablet_vec());
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
                              << ", id=" << request->id() << ", index_id=" << request->index_id()
@@ -177,7 +177,8 @@ void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_b
                                          const PFetchDataRequest* request, PFetchDataResult* result,
                                          google::protobuf::Closure* done) {
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-    bool resp_in_attachment = request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
+    bool resp_in_attachment =
+            request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
     GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, resp_in_attachment, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
@@ -196,8 +197,9 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
         if (!kafka_request.offset_times().empty()) {
             // if offset_times() has elements, which means this request is to get offset by timestamp.
             std::vector<PIntegerPair> partition_offsets;
-            Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
-                    request->kafka_meta_request(), &partition_offsets);
+            Status st =
+                    _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+                            request->kafka_meta_request(), &partition_offsets);
             if (st.ok()) {
                 PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
                 for (const auto& entry : partition_offsets) {
@@ -222,7 +224,7 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
             st.to_protobuf(response->mutable_status());
             return;
         }
-    } 
+    }
     Status::OK().to_protobuf(response->mutable_status());
 }
 
@@ -253,6 +255,36 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
     _exec_env->result_cache()->clear(request, response);
 }
 
+template <typename T>
+void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* controller,
+                                           const ::doris::PMergeFilterRequest* request,
+                                           ::doris::PMergeFilterResponse* response,
+                                           ::google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
+    Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
+    if (!st.ok()) {
+        LOG(WARNING) << "merge meet error" << st.to_string();
+    }
+    st.to_protobuf(response->mutable_status());
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* controller,
+                                           const ::doris::PPublishFilterRequest* request,
+                                           ::doris::PPublishFilterResponse* response,
+                                           ::google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+    UniqueId unique_id(request->query_id());
+    // TODO: avoid copy attachment copy
+    LOG(INFO) << "rpc apply_filter recv";
+    Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
+    if (!st.ok()) {
+        LOG(WARNING) << "apply filter meet error" << st.to_string();
+    }
+    st.to_protobuf(response->mutable_status());
+}
 template class PInternalServiceImpl<PBackendService>;
 template class PInternalServiceImpl<palo::PInternalService>;
 
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 1d794dc..247f985 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -83,6 +83,15 @@ public:
     void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request,
                      PCacheResponse* response, google::protobuf::Closure* done) override;
 
+    void merge_filter(::google::protobuf::RpcController* controller,
+                      const ::doris::PMergeFilterRequest* request,
+                      ::doris::PMergeFilterResponse* response,
+                      ::google::protobuf::Closure* done) override;
+    void apply_filter(::google::protobuf::RpcController* controller,
+                      const ::doris::PPublishFilterRequest* request,
+                      ::doris::PPublishFilterResponse* response,
+                      ::google::protobuf::Closure* done) override;
+
 private:
     Status _exec_plan_fragment(const std::string& s_request);
 
diff --git a/be/test/exec/hash_table_test.cpp b/be/test/exec/hash_table_test.cpp
index b2c1f72..fae151b 100644
--- a/be/test/exec/hash_table_test.cpp
+++ b/be/test/exec/hash_table_test.cpp
@@ -37,10 +37,10 @@
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/test_env.h"
+#include "test_util/test_util.h"
 #include "util/cpu_info.h"
 #include "util/runtime_profile.h"
 #include "util/time.h"
-#include "test_util/test_util.h"
 
 namespace doris {
 
@@ -383,6 +383,12 @@ TEST_F(HashTableTest, GrowTableTest2) {
     }
 
     LOG(INFO) << time(NULL);
+
+    size_t counter = 0;
+    auto func = [&](TupleRow* row) { counter++; };
+    hash_table.for_each_row(func);
+    ASSERT_EQ(counter, hash_table.size());
+
     hash_table.close();
 }
 
diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt
index 8c393aa..b24cd17 100644
--- a/be/test/exprs/CMakeLists.txt
+++ b/be/test/exprs/CMakeLists.txt
@@ -35,4 +35,5 @@ ADD_BE_TEST(encryption_functions_test)
 #ADD_BE_TEST(in-predicate-test)
 ADD_BE_TEST(math_functions_test)
 ADD_BE_TEST(topn_function_test)
-
+ADD_BE_TEST(runtime_filter_test)
+ADD_BE_TEST(bloom_filter_predicate_test)
diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp
new file mode 100644
index 0000000..ea54a90
--- /dev/null
+++ b/be/test/exprs/bloom_filter_predicate_test.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "exprs/bloomfilter_predicate.h"
+#include "gtest/gtest.h"
+#include "runtime/string_value.h"
+
+namespace doris {
+class BloomFilterPredicateTest : public testing::Test {
+public:
+    BloomFilterPredicateTest() = default;
+    virtual void SetUp() {}
+    virtual void TearDown() {}
+};
+
+TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
+    auto tracker = MemTracker::CreateTracker();
+    std::unique_ptr<BloomFilterFuncBase> func(
+            BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_INT));
+    ASSERT_TRUE(func->init().ok());
+    const int data_size = 1024;
+    int data[data_size];
+    for (int i = 0; i < data_size; i++) {
+        data[i] = i;
+        func->insert((const void*)&data[i]);
+    }
+    for (int i = 0; i < data_size; i++) {
+        ASSERT_TRUE(func->find((const void*)&data[i]));
+    }
+    // test not exist val
+    int not_exist_val = 0x3355ff;
+    ASSERT_FALSE(func->find((const void*)&not_exist_val));
+}
+
+TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
+    auto tracker = MemTracker::CreateTracker();
+    std::unique_ptr<BloomFilterFuncBase> func(
+            BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_VARCHAR));
+    ASSERT_TRUE(func->init().ok());
+    ObjectPool obj_pool;
+    const int data_size = 1024;
+    StringValue data[data_size];
+    for (int i = 0; i < data_size; i++) {
+        auto str = obj_pool.add(new std::string(std::to_string(i)));
+        data[i] = StringValue(*str);
+        func->insert((const void*)&data[i]);
+    }
+    for (int i = 0; i < data_size; i++) {
+        ASSERT_TRUE(func->find((const void*)&data[i]));
+    }
+    // test not exist value
+    std::string not_exist_str = "0x3355ff";
+    StringValue not_exist_val(not_exist_str);
+    ASSERT_FALSE(func->find((const void*)&not_exist_val));
+
+    // test fixed char
+    func.reset(BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_CHAR));
+    ASSERT_TRUE(func->init().ok());
+
+    auto varchar_true_str = obj_pool.add(new std::string("true"));
+    StringValue varchar_true(*varchar_true_str);
+    func->insert((const void*)&varchar_true);
+
+    auto varchar_false_str = obj_pool.add(new std::string("false"));
+    StringValue varchar_false(*varchar_false_str);
+    func->insert((const void*)&varchar_false);
+
+    StringValue fixed_char_true;
+    char true_buf[100] = "true";
+    memset(true_buf + strlen(true_buf), 0, 100 - strlen(true_buf));
+    fixed_char_true.ptr = true_buf;
+    fixed_char_true.len = 10;
+
+    StringValue fixed_char_false;
+    char false_buf[100] = "false";
+    memset(false_buf + strlen(false_buf), 0, 100 - strlen(false_buf));
+    fixed_char_false.ptr = false_buf;
+    fixed_char_false.len = 10;
+
+    ASSERT_TRUE(func->find_olap_engine((const void*)&fixed_char_true));
+    ASSERT_TRUE(func->find_olap_engine((const void*)&fixed_char_false));
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
\ No newline at end of file
diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp
new file mode 100644
index 0000000..3cd858e
--- /dev/null
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/runtime_filter.h"
+
+#include <array>
+#include <memory>
+
+#include "exprs/expr_context.h"
+#include "exprs/slot_ref.h"
+#include "gen_cpp/Planner_types.h"
+#include "gen_cpp/Types_types.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+
+namespace doris {
+TTypeDesc create_type_desc(PrimitiveType type);
+
+class RuntimeFilterTest : public testing::Test {
+public:
+    RuntimeFilterTest() {}
+    virtual void SetUp() {
+        ExecEnv* exec_env = ExecEnv::GetInstance();
+        exec_env = nullptr;
+        _runtime_stat.reset(
+                new RuntimeState(_fragment_id, _query_options, _query_globals, exec_env));
+        _runtime_stat->init_instance_mem_tracker();
+    }
+    virtual void TearDown() { _obj_pool.clear(); }
+
+private:
+    ObjectPool _obj_pool;
+    TUniqueId _fragment_id;
+    TQueryOptions _query_options;
+    TQueryGlobals _query_globals;
+
+    std::unique_ptr<RuntimeState> _runtime_stat;
+    // std::unique_ptr<IRuntimeFilter> _runtime_filter;
+};
+
+TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
+    TRuntimeFilterDesc desc;
+    desc.__set_filter_id(0);
+    desc.__set_expr_order(0);
+    desc.__set_has_local_targets(true);
+    desc.__set_has_remote_targets(false);
+    desc.__set_is_broadcast_join(true);
+    desc.__set_type(TRuntimeFilterType::BLOOM);
+    desc.__set_bloom_filter_size_bytes(4096);
+
+    // build src expr context
+
+    {
+        TExpr build_expr;
+        TExprNode expr_node;
+        expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+        expr_node.__set_type(create_type_desc(TYPE_INT));
+        expr_node.__set_num_children(0);
+        expr_node.__isset.slot_ref = true;
+        TSlotRef slot_ref;
+        slot_ref.__set_slot_id(0);
+        slot_ref.__set_tuple_id(0);
+        expr_node.__set_slot_ref(slot_ref);
+        expr_node.__isset.output_column = true;
+        expr_node.__set_output_column(0);
+        build_expr.nodes.push_back(expr_node);
+        desc.__set_src_expr(build_expr);
+    }
+    // build dst expr
+    {
+        TExpr target_expr;
+        TExprNode expr_node;
+        expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+        expr_node.__set_type(create_type_desc(TYPE_INT));
+        expr_node.__set_num_children(0);
+        expr_node.__isset.slot_ref = true;
+        TSlotRef slot_ref;
+        slot_ref.__set_slot_id(0);
+        slot_ref.__set_tuple_id(0);
+        expr_node.__set_slot_ref(slot_ref);
+        expr_node.__isset.output_column = true;
+        expr_node.__set_output_column(0);
+        target_expr.nodes.push_back(expr_node);
+        std::map<int, TExpr> planid_to_target_expr = {{0, target_expr}};
+        desc.__set_planId_to_target_expr(planid_to_target_expr);
+    }
+
+    // size_t prob_index = 0;
+    SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+    ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+    ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+    IRuntimeFilter* runtime_filter = nullptr;
+
+    Status status = IRuntimeFilter::create(_runtime_stat.get(),
+                                           _runtime_stat->instance_mem_tracker().get(), &_obj_pool,
+                                           &desc, RuntimeFilterRole::PRODUCER, -1, &runtime_filter);
+
+    ASSERT_TRUE(status.ok());
+
+    // generate data
+    std::array<TupleRow, 1024> tuple_rows;
+    int generator_index = 0;
+    auto generator = [&]() {
+        std::array<int, 2>* data = _obj_pool.add(new std::array<int, 2>());
+        data->at(0) = data->at(1) = generator_index++;
+        TupleRow row;
+        row._tuples[0] = (Tuple*)data->data();
+        return row;
+    };
+    std::generate(tuple_rows.begin(), tuple_rows.end(), generator);
+
+    std::array<TupleRow, 1024> not_exist_data;
+    // generate not exist data
+    std::generate(not_exist_data.begin(), not_exist_data.end(), generator);
+
+    // build runtime filter
+    for (TupleRow& row : tuple_rows) {
+        void* val = build_expr_ctx->get_value(&row);
+        runtime_filter->insert(val);
+    }
+    // get expr context from filter
+
+    std::list<ExprContext*> expr_context_list;
+    ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, prob_expr_ctx).ok());
+    ASSERT_TRUE(!expr_context_list.empty());
+
+    // test data in
+    for (TupleRow& row : tuple_rows) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+        }
+    }
+    // test not exist data
+    for (TupleRow& row : not_exist_data) {
+        for (ExprContext* ctx : expr_context_list) {
+            ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+        }
+    }
+
+    // test null
+    for (ExprContext* ctx : expr_context_list) {
+        std::array<int, 2>* data = _obj_pool.add(new std::array<int, 2>());
+        data->at(0) = data->at(1) = generator_index++;
+        TupleRow row;
+        row._tuples[0] = nullptr;
+        ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+    }
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
\ No newline at end of file
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 5256cb3..442314c 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -31,6 +31,7 @@ ADD_BE_TEST(run_length_integer_test)
 ADD_BE_TEST(stream_index_test)
 ADD_BE_TEST(lru_cache_test)
 ADD_BE_TEST(bloom_filter_test)
+ADD_BE_TEST(bloom_filter_column_predicate_test)
 ADD_BE_TEST(bloom_filter_index_test)
 ADD_BE_TEST(comparison_predicate_test)
 ADD_BE_TEST(in_list_predicate_test)
diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp
new file mode 100644
index 0000000..feafafe
--- /dev/null
+++ b/be/test/olap/bloom_filter_column_predicate_test.cpp
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <google/protobuf/stubs/common.h>
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include "olap/bloom_filter_predicate.h"
+#include "olap/column_predicate.h"
+#include "olap/field.h"
+#include "olap/row_block2.h"
+#include "runtime/mem_pool.h"
+#include "runtime/string_value.hpp"
+#include "runtime/vectorized_row_batch.h"
+#include "util/logging.h"
+
+namespace doris {
+
+class TestBloomFilterColumnPredicate : public testing::Test {
+public:
+    TestBloomFilterColumnPredicate() : _vectorized_batch(NULL), _row_block(nullptr) {
+        _mem_tracker.reset(new MemTracker(-1));
+        _mem_pool.reset(new MemPool(_mem_tracker.get()));
+    }
+
+    ~TestBloomFilterColumnPredicate() {
+        if (_vectorized_batch != NULL) {
+            delete _vectorized_batch;
+        }
+    }
+
+    void SetTabletSchema(std::string name, const std::string& type, const std::string& aggregation,
+                         uint32_t length, bool is_allow_null, bool is_key,
+                         TabletSchema* tablet_schema) {
+        TabletSchemaPB tablet_schema_pb;
+        static int id = 0;
+        ColumnPB* column = tablet_schema_pb.add_column();
+        column->set_unique_id(++id);
+        column->set_name(name);
+        column->set_type(type);
+        column->set_is_key(is_key);
+        column->set_is_nullable(is_allow_null);
+        column->set_length(length);
+        column->set_aggregation(aggregation);
+        column->set_precision(1000);
+        column->set_frac(1000);
+        column->set_is_bf_column(false);
+
+        tablet_schema->init_from_pb(tablet_schema_pb);
+    }
+
+    void InitVectorizedBatch(const TabletSchema* tablet_schema, const std::vector<uint32_t>& ids,
+                             int size) {
+        _vectorized_batch = new VectorizedRowBatch(tablet_schema, ids, size);
+        _vectorized_batch->set_size(size);
+    }
+
+    void init_row_block(const TabletSchema* tablet_schema, int size) {
+        Schema schema(*tablet_schema);
+        _row_block.reset(new RowBlockV2(schema, size));
+    }
+
+    std::shared_ptr<MemTracker> _mem_tracker;
+    std::unique_ptr<MemPool> _mem_pool;
+    VectorizedRowBatch* _vectorized_batch;
+    std::unique_ptr<RowBlockV2> _row_block;
+};
+
+TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
+    TabletSchema tablet_schema;
+    SetTabletSchema(std::string("FLOAT_COLUMN"), "FLOAT", "REPLACE", 1, true, true, &tablet_schema);
+    const int size = 10;
+    std::vector<uint32_t> return_columns;
+    for (int i = 0; i < tablet_schema.num_columns(); ++i) {
+        return_columns.push_back(i);
+    }
+
+    auto tracker = MemTracker::CreateTracker(-1, "OlapScanner");
+    std::shared_ptr<BloomFilterFuncBase> bloom_filter =
+            std::make_shared<BloomFilterFunc<float>>(_mem_tracker.get());
+    bloom_filter->init();
+    float value = 4.1;
+    bloom_filter->insert(reinterpret_cast<void*>(&value));
+    value = 5.1;
+    bloom_filter->insert(reinterpret_cast<void*>(&value));
+    value = 6.1;
+    bloom_filter->insert(reinterpret_cast<void*>(&value));
+    ColumnPredicate* pred = new BloomFilterColumnPredicate(0, bloom_filter);
+
+    // for VectorizedBatch no null
+    InitVectorizedBatch(&tablet_schema, return_columns, size);
+    ColumnVector* col_vector = _vectorized_batch->column(0);
+    col_vector->set_no_nulls(true);
+    auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float)));
+    col_vector->set_col_data(col_data);
+    for (int i = 0; i < size; ++i) {
+        *(col_data + i) = i + 0.1f;
+    }
+    pred->evaluate(_vectorized_batch);
+    ASSERT_EQ(_vectorized_batch->size(), 10);
+    uint16_t* sel = _vectorized_batch->selected();
+    ASSERT_FLOAT_EQ(*(col_data + sel[0]), 0.1);
+    ASSERT_FLOAT_EQ(*(col_data + sel[1]), 1.1);
+    ASSERT_FLOAT_EQ(*(col_data + sel[2]), 2.1);
+
+    // for ColumnBlock no null
+    init_row_block(&tablet_schema, size);
+    ColumnBlock col_block = _row_block->column_block(0);
+    auto select_size = _row_block->selected_size();
+    ColumnBlockView col_block_view(&col_block);
+    for (int i = 0; i < size; ++i, col_block_view.advance(1)) {
+        col_block_view.set_null_bits(1, false);
+        *reinterpret_cast<float*>(col_block_view.data()) = i + 0.1f;
+    }
+    pred->evaluate(&col_block, _row_block->selection_vector(), &select_size);
+    ASSERT_EQ(select_size, 3);
+    ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 4.1);
+    ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[1]).cell_ptr(), 5.1);
+    ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[2]).cell_ptr(), 6.1);
+
+    // for VectorizedBatch has nulls
+    col_vector->set_no_nulls(false);
+    bool* is_null = reinterpret_cast<bool*>(_mem_pool->allocate(size));
+    memset(is_null, 0, size);
+    col_vector->set_is_null(is_null);
+    for (int i = 0; i < size; ++i) {
+        if (i % 2 == 0) {
+            is_null[i] = true;
+        } else {
+            *(col_data + i) = i + 0.1;
+        }
+    }
+    _vectorized_batch->set_size(size);
+    _vectorized_batch->set_selected_in_use(false);
+    pred->evaluate(_vectorized_batch);
+    ASSERT_EQ(_vectorized_batch->size(), 10);
+    sel = _vectorized_batch->selected();
+    ASSERT_FLOAT_EQ(*(col_data + sel[0]), 0.1);
+    ASSERT_FLOAT_EQ(*(col_data + sel[1]), 1.1);
+    ASSERT_FLOAT_EQ(*(col_data + sel[2]), 2.1);
+
+    // for ColumnBlock has nulls
+    col_block_view = ColumnBlockView(&col_block);
+    for (int i = 0; i < size; ++i, col_block_view.advance(1)) {
+        if (i % 2 == 0) {
+            col_block_view.set_null_bits(1, true);
+        } else {
+            col_block_view.set_null_bits(1, false);
+            *reinterpret_cast<float*>(col_block_view.data()) = i + 0.1;
+        }
+    }
+    _row_block->clear();
+    select_size = _row_block->selected_size();
+    pred->evaluate(&col_block, _row_block->selection_vector(), &select_size);
+    ASSERT_EQ(select_size, 1);
+    ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 5.1);
+
+    delete pred;
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+    if (!doris::config::init(conffile.c_str(), false)) {
+        fprintf(stderr, "error read config file. \n");
+        return -1;
+    }
+    doris::init_glog("be-test");
+    int ret = doris::OLAP_SUCCESS;
+    testing::InitGoogleTest(&argc, argv);
+    doris::CpuInfo::init();
+    ret = RUN_ALL_TESTS();
+    google::protobuf::ShutdownProtobufLibrary();
+    return ret;
+}
diff --git a/be/test/olap/in_list_predicate_test.cpp b/be/test/olap/in_list_predicate_test.cpp
index 1f314a9..2d6a38a 100644
--- a/be/test/olap/in_list_predicate_test.cpp
+++ b/be/test/olap/in_list_predicate_test.cpp
@@ -156,7 +156,7 @@ public:
             *(col_data + i) = i;                                                                  \
         }                                                                                         \
                                                                                                   \
-        std::set<TYPE> values;                                                                    \
+        std::unordered_set<TYPE> values;                                                          \
         values.insert(4);                                                                         \
         values.insert(5);                                                                         \
         values.insert(6);                                                                         \
@@ -203,7 +203,7 @@ TEST_IN_LIST_PREDICATE(int128_t, LARGEINT, "LARGEINT")
         int size = 10;                                                                            \
         Schema schema(tablet_schema);                                                             \
         RowBlockV2 block(schema, size);                                                           \
-        std::set<TYPE> values;                                                                    \
+        std::unordered_set<TYPE> values;                                                          \
         values.insert(4);                                                                         \
         values.insert(5);                                                                         \
         values.insert(6);                                                                         \
@@ -268,7 +268,7 @@ TEST_F(TestInListPredicate, FLOAT_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<float> values;
+    std::unordered_set<float> values;
     values.insert(4.1);
     values.insert(5.1);
     values.insert(6.1);
@@ -352,7 +352,7 @@ TEST_F(TestInListPredicate, DOUBLE_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<double> values;
+    std::unordered_set<double> values;
     values.insert(4.1);
     values.insert(5.1);
     values.insert(6.1);
@@ -437,7 +437,7 @@ TEST_F(TestInListPredicate, DECIMAL_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<decimal12_t> values;
+    std::unordered_set<decimal12_t> values;
 
     decimal12_t value1 = {4, 4};
     values.insert(value1);
@@ -530,7 +530,7 @@ TEST_F(TestInListPredicate, CHAR_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<StringValue> values;
+    std::unordered_set<StringValue> values;
     StringValue value1;
     const char* value1_buffer = "aaaaa";
     value1.ptr = const_cast<char*>(value1_buffer);
@@ -658,7 +658,7 @@ TEST_F(TestInListPredicate, VARCHAR_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<StringValue> values;
+    std::unordered_set<StringValue> values;
     StringValue value1;
     const char* value1_buffer = "a";
     value1.ptr = const_cast<char*>(value1_buffer);
@@ -783,7 +783,7 @@ TEST_F(TestInListPredicate, DATE_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<uint24_t> values;
+    std::unordered_set<uint24_t> values;
     uint24_t value1 = datetime::timestamp_from_date("2017-09-09");
     values.insert(value1);
 
@@ -892,7 +892,7 @@ TEST_F(TestInListPredicate, DATETIME_COLUMN) {
     for (int i = 0; i < tablet_schema.num_columns(); ++i) {
         return_columns.push_back(i);
     }
-    std::set<uint64_t> values;
+    std::unordered_set<uint64_t> values;
     uint64_t value1 = datetime::timestamp_from_datetime("2017-09-09 00:00:01");
     values.insert(value1);
 
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp
index cbb3ac8..cb4be94 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -40,8 +40,8 @@
 #include "olap/types.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
-#include "util/file_utils.h"
 #include "test_util/test_util.h"
+#include "util/file_utils.h"
 
 namespace doris {
 namespace segment_v2 {
@@ -1087,7 +1087,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
         // test where v1 in (10,20,1)
         {
             std::vector<ColumnPredicate*> column_predicates;
-            std::set<int32_t> values;
+            std::unordered_set<int32_t> values;
             values.insert(10);
             values.insert(20);
             values.insert(1);
@@ -1111,7 +1111,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
         // test where v1 not in (10,20)
         {
             std::vector<ColumnPredicate*> column_predicates;
-            std::set<int32_t> values;
+            std::unordered_set<int32_t> values;
             values.insert(10);
             values.insert(20);
             std::unique_ptr<ColumnPredicate> predicate(
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 6aaf40d..95b1be4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -256,6 +256,75 @@ message PProxyResult {
 	optional PKafkaPartitionOffsets partition_offsets = 3;
 };
 
+message PBloomFilter {
+     required bool always_true = 2;
+     required int32 filter_length = 1;
+};
+
+message PColumnValue {
+    optional bool boolVal = 1;
+    optional int32 intVal = 2;
+    optional int64 longVal = 3;
+    optional double doubleVal = 4;
+    optional bytes stringVal = 5;
+}
+
+// TODO: CHECK ALL TYPE
+enum PColumnType {
+    COLUMN_TYPE_BOOL = 0;
+    COLUMN_TYPE_INT = 1;
+    COLUMN_TYPE_TINY_INT = 2;
+    COLUMN_TYPE_SMALL_INT = 3;
+    COLUMN_TYPE_BIGINT = 4;
+    COLUMN_TYPE_LARGEINT = 5;
+    COLUMN_TYPE_VARCHAR = 6;
+    COLUMN_TYPE_CHAR = 7;
+    COLUMN_TYPE_DATE = 8;
+    COLUMN_TYPE_DATETIME = 9;
+    COLUMN_TYPE_DOUBLE = 10;
+    COLUMN_TYPE_FLOAT = 11;
+    COLUMN_TYPE_DECIMAL = 12;
+    COLUMN_TYPE_DECIMALV2 = 13;
+}
+
+message PMinMaxFilter {
+    required PColumnType column_type = 1;
+    required PColumnValue min_val = 2;
+    required PColumnValue max_val = 3;
+};
+
+enum PFilterType {
+    UNKNOW_FILTER = 0;
+    BLOOM_FILTER = 1;
+    MINMAX_FILTER = 2;
+};
+
+message PMergeFilterRequest {
+    required int32 filter_id = 1;
+    required PUniqueId query_id = 2;
+    required PUniqueId fragment_id = 3;
+    required PFilterType filter_type = 4;
+    optional PMinMaxFilter minmax_filter = 5;
+    optional PBloomFilter bloom_filter = 6;
+};
+
+message PMergeFilterResponse {
+    required PStatus status = 1;
+};
+
+message PPublishFilterRequest {
+    required int32 filter_id = 1;
+    required PUniqueId query_id = 2;
+    required PUniqueId fragment_id = 3;
+    required PFilterType filter_type = 4;
+    optional PMinMaxFilter minmax_filter = 5;
+    optional PBloomFilter bloom_filter = 6;
+};
+
+message PPublishFilterResponse {
+    required PStatus status = 1;
+};
+
 // NOTE(zc): If you want to add new method here,
 // you MUST add same method to palo_internal_service.proto
 service PBackendService {
@@ -270,5 +339,7 @@ service PBackendService {
     rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
     rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
     rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
+    rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
+    rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
 };
 
diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto
index c57ca24..592fe58 100644
--- a/gensrc/proto/palo_internal_service.proto
+++ b/gensrc/proto/palo_internal_service.proto
@@ -38,4 +38,6 @@ service PInternalService {
     rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
     rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);
     rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse);
+    rpc merge_filter(doris.PMergeFilterRequest) returns (doris.PMergeFilterResponse);
+    rpc apply_filter(doris.PPublishFilterRequest) returns (doris.PPublishFilterResponse);
 };
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 281e37a..e91573a 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -47,6 +47,9 @@ enum TExprNodeType {
   // TODO: old style compute functions. this will be deprecated
   COMPUTE_FUNCTION_CALL,
   LARGE_INT_LITERAL,
+
+  // only used in runtime filter
+  BLOOM_PRED,
 }
 
 //enum TAggregationOp {
diff --git a/gensrc/thrift/Opcodes.thrift b/gensrc/thrift/Opcodes.thrift
index 09db137..d55989f 100644
--- a/gensrc/thrift/Opcodes.thrift
+++ b/gensrc/thrift/Opcodes.thrift
@@ -83,4 +83,5 @@ enum TExprOpcode {
     FACTORIAL,
     LAST_OPCODE,
     EQ_FOR_NULL,
+    RT_FILTER,
 }
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index c320a98..888af72 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -140,6 +140,30 @@ struct TQueryOptions {
   31: optional bool enable_spilling = false;
   // whether enable parallel merge in exchange node
   32: optional bool enable_enable_exchange_node_parallel_merge = false;
+
+  // runtime filter run mode
+  33: optional string runtime_filter_mode = "GLOBAL";
+
+  // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
+  // be rounded up to the nearest power of two.
+  34: optional i32 runtime_bloom_filter_size = 1048576
+
+  // Minimum runtime bloom filter size, in bytes
+  35: optional i32 runtime_bloom_filter_min_size = 1048576
+
+  // Maximum runtime bloom filter size, in bytes
+  36: optional i32 runtime_bloom_filter_max_size = 16777216
+
+  // Time in ms to wait until runtime filters are delivered.
+  37: optional i32 runtime_filter_wait_time_ms = 1000
+
+  // Maximum number of bloom runtime filters allowed per query
+  38: optional i32 runtime_filters_max_num = 10
+
+  // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
+  39: optional i32 runtime_filter_type = 1;
+
+  40: optional i32 runtime_filter_max_in_num = 1024;
 }
     
 
@@ -159,6 +183,27 @@ struct TPlanFragmentDestination {
   3: optional Types.TNetworkAddress brpc_server
 }
 
+struct TRuntimeFilterTargetParams {
+  1: required Types.TUniqueId target_fragment_instance_id
+  // The address of the instance where the fragment is expected to run
+  2: required Types.TNetworkAddress target_fragment_instance_addr
+}
+
+struct TRuntimeFilterParams {
+  // Runtime filter merge instance address
+  1: optional Types.TNetworkAddress runtime_filter_merge_addr
+
+  // Runtime filter ID to the instance address of the fragment,
+  // that is expected to use this runtime filter
+  2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
+
+  // Runtime filter ID to the runtime filter desc
+  3: optional map<i32, PlanNodes.TRuntimeFilterDesc> rid_to_runtime_filter
+
+  // Number of Runtime filter producers
+  4: optional map<i32, i32> runtime_filter_builder_num
+}
+
 // Parameters for a single execution instance of a particular TPlanFragment
 // TODO: for range partitioning, we also need to specify the range boundaries
 struct TPlanFragmentExecParams {
@@ -191,6 +236,8 @@ struct TPlanFragmentExecParams {
   9: optional i32 sender_id
   10: optional i32 num_senders
   11: optional bool send_query_statistics_with_every_batch
+  // Used to merge and send runtime filter
+  12: optional TRuntimeFilterParams runtime_filter_params
 }
 
 // Global query parameters assigned by the coordinator.
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ba1c8c6..320b9e0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -687,6 +687,47 @@ struct TAssertNumRowsNode {
     3: optional TAssertion assertion;
 }
 
+enum TRuntimeFilterType {
+  IN = 1
+  BLOOM = 2
+  MIN_MAX = 4
+}
+
+// Specification of a runtime filter.
+struct TRuntimeFilterDesc {
+  // Filter unique id (within a query)
+  1: required i32 filter_id
+
+  // Expr on which the filter is built on a hash join.
+  2: required Exprs.TExpr src_expr
+
+  // The order of Expr in join predicate
+  3: required i32 expr_order
+
+  // Map of target node id to the target expr
+  4: required map<Types.TPlanNodeId, Exprs.TExpr> planId_to_target_expr
+
+  // Indicates if the source join node of this filter is a broadcast or
+  // a partitioned join.
+  5: required bool is_broadcast_join
+
+  // Indicates if there is at least one target scan node that is in the
+  // same fragment as the broadcast join that produced the runtime filter
+  6: required bool has_local_targets
+
+  // Indicates if there is at least one target scan node that is not in the same
+  // fragment as the broadcast join that produced the runtime filter
+  7: required bool has_remote_targets
+
+  // The type of runtime filter to build.
+  8: required TRuntimeFilterType type
+
+  // The size of the filter based on the ndv estimate and the min/max limit specified in
+  // the query options. Should be greater than zero for bloom filters, zero otherwise.
+  9: optional i64 bloom_filter_size_bytes
+}
+
+
 // This is essentially a union of all messages corresponding to subclasses
 // of PlanNode.
 struct TPlanNode {
@@ -728,6 +769,8 @@ struct TPlanNode {
   33: optional TIntersectNode intersect_node
   34: optional TExceptNode except_node
   35: optional TOdbcScanNode odbc_scan_node
+  // Runtime filters assigned to this plan node, exist in HashJoinNode and ScanNode
+  36: optional list<TRuntimeFilterDesc> runtime_filters
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 34ef5a2..544e29b 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -194,6 +194,21 @@ enum TExplainLevel {
   VERBOSE
 }
 
+enum TRuntimeFilterMode {
+  // No filters are computed in the FE or the BE.
+  OFF = 0
+
+  // Only broadcast filters are computed in the BE, and are only published to the local
+  // fragment.
+  LOCAL = 1
+
+  // Only shuffle filters are computed in the BE, and are only published globally.
+  REMOTE = 2
+
+  // All fiters are computed in the BE, and are published globally.
+  GLOBAL = 3
+}
+
 struct TColumnType {
   1: required TPrimitiveType type
   // Only set if type == CHAR_ARRAY

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org