You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/07 09:40:53 UTC

[incubator-doris] branch master updated: [Refactor] Refactor part of RuntimeFilter's code (#6998)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 29ca776  [Refactor] Refactor part of RuntimeFilter's code (#6998)
29ca776 is described below

commit 29ca77622fe492ec767221b7d87b6dfcfff1aa8c
Author: Pxl <95...@qq.com>
AuthorDate: Sun Nov 7 17:40:45 2021 +0800

    [Refactor] Refactor part of RuntimeFilter's code (#6998)
    
    #6997
---
 be/src/exec/hash_join_node.cpp         |   5 +-
 be/src/exec/hash_join_node.h           |   1 +
 be/src/exprs/bloomfilter_predicate.cpp |  22 ++++--
 be/src/exprs/hybrid_set.cpp            |  19 +++--
 be/src/exprs/hybrid_set.h              |  16 ++--
 be/src/exprs/runtime_filter.cpp        | 108 +++++---------------------
 be/src/exprs/runtime_filter.h          |  46 +----------
 be/src/exprs/runtime_filter_slots.h    | 137 +++++++++++++++++++++++++++++++++
 be/src/runtime/primitive_type.h        |   4 +
 9 files changed, 197 insertions(+), 161 deletions(-)

diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 3fc0dfe..b1b9356 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -242,8 +242,7 @@ Status HashJoinNode::open(RuntimeState* state) {
                                                 _runtime_filter_descs);
 
         RETURN_IF_ERROR(thread_status.get_future().get());
-        RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(),
-                                                  _hash_tbl->size()));
+        RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size()));
         {
             SCOPED_TIMER(_push_compute_timer);
             auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
@@ -252,7 +251,7 @@ Status HashJoinNode::open(RuntimeState* state) {
         COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
         {
             SCOPED_TIMER(_push_down_timer);
-            runtime_filter_slots.publish(this);
+            runtime_filter_slots.publish();
         }
         Status open_status = child(0)->open(state);
         RETURN_IF_ERROR(open_status);
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index 2379f77..f097a2c 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -25,6 +25,7 @@
 
 #include "exec/exec_node.h"
 #include "exec/hash_table.h"
+#include "exprs/runtime_filter_slots.h"
 #include "gen_cpp/PlanNodes_types.h"
 
 namespace doris {
diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp
index 44906fd..4bc7584 100644
--- a/be/src/exprs/bloomfilter_predicate.cpp
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -40,31 +40,37 @@ IBloomFilterFuncBase* IBloomFilterFuncBase::create_bloom_filter(MemTracker* trac
         return new BloomFilterFunc<TYPE_INT, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_BIGINT:
         return new BloomFilterFunc<TYPE_BIGINT, CurrentBloomFilterAdaptor>(tracker);
+    case TYPE_LARGEINT:
+        return new BloomFilterFunc<TYPE_LARGEINT, CurrentBloomFilterAdaptor>(tracker);
+
     case TYPE_FLOAT:
         return new BloomFilterFunc<TYPE_FLOAT, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_DOUBLE:
         return new BloomFilterFunc<TYPE_DOUBLE, CurrentBloomFilterAdaptor>(tracker);
+
+    case TYPE_DECIMALV2:
+        return new BloomFilterFunc<TYPE_DECIMALV2, CurrentBloomFilterAdaptor>(tracker);
+
+    case TYPE_TIME:
+        return new BloomFilterFunc<TYPE_TIME, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_DATE:
         return new BloomFilterFunc<TYPE_DATE, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_DATETIME:
         return new BloomFilterFunc<TYPE_DATETIME, CurrentBloomFilterAdaptor>(tracker);
-    case TYPE_DECIMALV2:
-        return new BloomFilterFunc<TYPE_DECIMALV2, CurrentBloomFilterAdaptor>(tracker);
-    case TYPE_LARGEINT:
-        return new BloomFilterFunc<TYPE_LARGEINT, CurrentBloomFilterAdaptor>(tracker);
+
     case TYPE_CHAR:
         return new BloomFilterFunc<TYPE_CHAR, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_VARCHAR:
         return new BloomFilterFunc<TYPE_VARCHAR, CurrentBloomFilterAdaptor>(tracker);
     case TYPE_STRING:
-        return new BloomFilterFunc<TYPE_STRING, CurrentBloomFilterAdaptor>(tracker); 
+        return new BloomFilterFunc<TYPE_STRING, CurrentBloomFilterAdaptor>(tracker);
+
     default:
-        return nullptr;
+        DCHECK(false) << "Invalid type.";
     }
 
     return nullptr;
 }
-
 BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
         : Predicate(node),
           _is_prepare(false),
@@ -74,7 +80,7 @@ BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
 
 BloomFilterPredicate::~BloomFilterPredicate() {
     VLOG_NOTICE << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows
-              << ",rate:" << (double)_filtered_rows / _scan_rows;
+                << ",rate:" << (double)_filtered_rows / _scan_rows;
 }
 
 BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other)
diff --git a/be/src/exprs/hybrid_set.cpp b/be/src/exprs/hybrid_set.cpp
index 4ebdabc..1b3fd37 100644
--- a/be/src/exprs/hybrid_set.cpp
+++ b/be/src/exprs/hybrid_set.cpp
@@ -36,21 +36,22 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) {
     case TYPE_BIGINT:
         return new (std::nothrow) HybridSet<int64_t>();
 
+    case TYPE_LARGEINT:
+        return new (std::nothrow) HybridSet<__int128>();
+
     case TYPE_FLOAT:
         return new (std::nothrow) HybridSet<float>();
 
+    case TYPE_TIME:
     case TYPE_DOUBLE:
         return new (std::nothrow) HybridSet<double>();
 
-    case TYPE_DATE:
-    case TYPE_DATETIME:
-        return new (std::nothrow) HybridSet<DateTimeValue>();
-
     case TYPE_DECIMALV2:
         return new (std::nothrow) HybridSet<DecimalV2Value>();
 
-    case TYPE_LARGEINT:
-        return new (std::nothrow) HybridSet<__int128>();
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+        return new (std::nothrow) HybridSet<DateTimeValue>();
 
     case TYPE_CHAR:
     case TYPE_VARCHAR:
@@ -58,12 +59,10 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) {
         return new (std::nothrow) StringValueSet();
 
     default:
-        return NULL;
+        DCHECK(false) << "Invalid type.";
     }
 
-    return NULL;
+    return nullptr;
 }
 
 } // namespace doris
-
-/* vim: set ts=4 sw=4 sts=4 tw=100 */
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 47c02ea..6947f09 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -35,7 +35,7 @@ class HybridSetBase {
 public:
     HybridSetBase() = default;
     virtual ~HybridSetBase() = default;
-    virtual void insert(void* data) = 0;
+    virtual void insert(const void* data) = 0;
     // use in vectorize execute engine
     virtual void insert(void* data, size_t) = 0;
 
@@ -66,17 +66,18 @@ public:
 
     ~HybridSet() override = default;
 
-    void insert(void* data) override {
+    void insert(const void* data) override {
+        if (data == nullptr) return;
+
         if (sizeof(T) >= 16) {
             // for largeint, it will core dump with no memcpy
             T value;
             memcpy(&value, data, sizeof(T));
             _set.insert(value);
         } else {
-            _set.insert(*reinterpret_cast<T*>(data));
+            _set.insert(*reinterpret_cast<const T*>(data));
         }
     }
-
     void insert(void* data, size_t) override { insert(data); }
 
     void insert(HybridSetBase* set) override {
@@ -124,12 +125,13 @@ public:
 
     ~StringValueSet() override = default;
 
-    void insert(void* data) override {
-        auto* value = reinterpret_cast<StringValue*>(data);
+    void insert(const void* data) override {
+        if (data == nullptr) return;
+
+        const auto* value = reinterpret_cast<const StringValue*>(data);
         std::string str_value(value->ptr, value->len);
         _set.insert(str_value);
     }
-
     void insert(void* data, size_t size) override {
         std::string str_value(reinterpret_cast<char*>(data), size);
         _set.insert(str_value);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 446eb10..17e7f3b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -43,7 +43,7 @@ namespace doris {
 // only used in Runtime Filter
 class MinMaxFuncBase {
 public:
-    virtual void insert(void* data) = 0;
+    virtual void insert(const void* data) = 0;
     virtual bool find(void* data) = 0;
     virtual bool is_empty() = 0;
     virtual void* get_max() = 0;
@@ -61,9 +61,9 @@ class MinMaxNumFunc : public MinMaxFuncBase {
 public:
     MinMaxNumFunc() = default;
     ~MinMaxNumFunc() = default;
-    virtual void insert(void* data) {
+    virtual void insert(const void* data) {
         if (data == nullptr) return;
-        T val_data = *reinterpret_cast<T*>(data);
+        const T val_data = *reinterpret_cast<const T*>(data);
         if (_empty) {
             _min = val_data;
             _max = val_data;
@@ -101,7 +101,6 @@ public:
                 _max.ptr = str->data();
                 _max.len = str->length();
             }
-
         } else {
             MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
             if (other_minmax->_min < _min) {
@@ -151,30 +150,33 @@ MinMaxFuncBase* MinMaxFuncBase::create_minmax_filter(PrimitiveType type) {
     case TYPE_BIGINT:
         return new (std::nothrow) MinMaxNumFunc<int64_t>();
 
+    case TYPE_LARGEINT:
+        return new (std::nothrow) MinMaxNumFunc<__int128>();
+
     case TYPE_FLOAT:
         return new (std::nothrow) MinMaxNumFunc<float>();
 
+    case TYPE_TIME:
     case TYPE_DOUBLE:
         return new (std::nothrow) MinMaxNumFunc<double>();
 
-    case TYPE_DATE:
-    case TYPE_DATETIME:
-        return new (std::nothrow) MinMaxNumFunc<DateTimeValue>();
-
     case TYPE_DECIMALV2:
         return new (std::nothrow) MinMaxNumFunc<DecimalV2Value>();
 
-    case TYPE_LARGEINT:
-        return new (std::nothrow) MinMaxNumFunc<__int128>();
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+        return new (std::nothrow) MinMaxNumFunc<DateTimeValue>();
 
     case TYPE_CHAR:
     case TYPE_VARCHAR:
     case TYPE_STRING:
         return new (std::nothrow) MinMaxNumFunc<StringValue>();
+
     default:
         DCHECK(false) << "Invalid type.";
     }
-    return NULL;
+
+    return nullptr;
 }
 
 // PrimitiveType->TExprNodeType
@@ -331,6 +333,8 @@ TTypeDesc create_type_desc(PrimitiveType type) {
     TScalarType scalarType;
     scalarType.__set_type(to_thrift(type));
     scalarType.__set_len(-1);
+    scalarType.__set_precision(-1);
+    scalarType.__set_scale(-1);
     node_type.back().__set_scalar_type(scalarType);
     type_desc.__set_types(node_type);
     return type_desc;
@@ -472,18 +476,15 @@ public:
             return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
         }
         default:
-            DCHECK(false);
             return Status::InvalidArgument("Unknown Filter type");
         }
         return Status::OK();
     }
 
-    void insert(void* data) {
+    void insert(const void* data) {
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            if (data != nullptr) {
-                _hybrid_set->insert(data);
-            }
+            _hybrid_set->insert(data);
             break;
         }
         case RuntimeFilterType::MINMAX_FILTER: {
@@ -491,7 +492,6 @@ public:
             break;
         }
         case RuntimeFilterType::BLOOM_FILTER: {
-            DCHECK(_bloomfilter_func != nullptr);
             _bloomfilter_func->insert(data);
             break;
         }
@@ -747,12 +747,12 @@ Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPo
     return (*res)->init_with_desc(desc, node_id);
 }
 
-void IRuntimeFilter::insert(void* data) {
+void IRuntimeFilter::insert(const void* data) {
     DCHECK(is_producer());
     _wrapper->insert(data);
 }
 
-Status IRuntimeFilter::publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx) {
+Status IRuntimeFilter::publish() {
     DCHECK(is_producer());
     if (_has_local_target) {
         IRuntimeFilter* consumer_filter = nullptr;
@@ -1054,74 +1054,4 @@ Status IRuntimeFilter::consumer_close() {
 RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
 RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
 
-Status RuntimeFilterSlots::init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
-                                int64_t hash_table_size) {
-    DCHECK(_probe_expr_context.size() == _build_expr_context.size());
-
-    // runtime filter effect stragety
-    // 1. we will ignore IN filter when hash_table_size is too big
-    // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size
-    // is too small and IN filter has effect
-
-    std::map<int, bool> has_in_filter;
-
-    auto ignore_filter = [state](int filter_id) {
-        IRuntimeFilter* consumer_filter = nullptr;
-        state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
-        DCHECK(consumer_filter != nullptr);
-        consumer_filter->set_ignored();
-        consumer_filter->signal();
-    };
-
-    for (auto& filter_desc : _runtime_filter_descs) {
-        IRuntimeFilter* runtime_filter = nullptr;
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
-                                                                         &runtime_filter));
-        DCHECK(runtime_filter != nullptr);
-        DCHECK(runtime_filter->expr_order() >= 0);
-        DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
-
-        if (runtime_filter->type() == RuntimeFilterType::IN_FILTER &&
-            hash_table_size >= state->runtime_filter_max_in_num()) {
-            ignore_filter(filter_desc.filter_id);
-            continue;
-        }
-        if (has_in_filter[runtime_filter->expr_order()] && !runtime_filter->has_remote_target() &&
-            runtime_filter->type() != RuntimeFilterType::IN_FILTER &&
-            hash_table_size < state->runtime_filter_max_in_num()) {
-            ignore_filter(filter_desc.filter_id);
-            continue;
-        }
-        has_in_filter[runtime_filter->expr_order()] =
-                (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
-        _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
-    }
-
-    return Status::OK();
-}
-
-void RuntimeFilterSlots::ready_for_publish() {
-    for (auto& pair : _runtime_filters) {
-        for (auto filter : pair.second) {
-            filter->ready_for_publish();
-        }
-    }
-}
-
-void RuntimeFilterSlots::publish(HashJoinNode* hash_join_node) {
-    for (int i = 0; i < _probe_expr_context.size(); ++i) {
-        auto iter = _runtime_filters.find(i);
-        if (iter != _runtime_filters.end()) {
-            for (auto filter : iter->second) {
-                filter->publish(hash_join_node, _probe_expr_context[i]);
-            }
-        }
-    }
-    for (auto& pair : _runtime_filters) {
-        for (auto filter : pair.second) {
-            filter->publish_finally();
-        }
-    }
-}
-
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b182c42..809c132 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -120,11 +120,11 @@ public:
 
     // insert data to build filter
     // only used for producer
-    void insert(void* data);
+    void insert(const void* data);
 
     // publish filter
     // push filter to remote node or push down it to scan_node
-    Status publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx);
+    Status publish();
 
     void publish_finally();
 
@@ -209,7 +209,6 @@ protected:
     static Status _create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool,
                                   std::unique_ptr<RuntimePredicateWrapper>* wrapper);
 
-protected:
     RuntimeState* _state;
     MemTracker* _mem_tracker;
     ObjectPool* _pool;
@@ -280,47 +279,6 @@ private:
     WrapperPtr _wrapper;
 };
 
-/// this class used in a hash join node
-/// Provide a unified interface for other classes
-class RuntimeFilterSlots {
-public:
-    RuntimeFilterSlots(const std::vector<ExprContext*>& prob_expr_ctxs,
-                       const std::vector<ExprContext*>& build_expr_ctxs,
-                       const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
-            : _probe_expr_context(prob_expr_ctxs),
-              _build_expr_context(build_expr_ctxs),
-              _runtime_filter_descs(runtime_filter_descs) {}
-
-    Status init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
-                int64_t hash_table_size);
-
-    void insert(TupleRow* row) {
-        for (int i = 0; i < _build_expr_context.size(); ++i) {
-            auto iter = _runtime_filters.find(i);
-            if (iter != _runtime_filters.end()) {
-                void* val = _build_expr_context[i]->get_value(row);
-                if (val != nullptr) {
-                    for (auto filter : iter->second) {
-                        filter->insert(val);
-                    }
-                }
-            }
-        }
-    }
-
-    // should call this method after insert
-    void ready_for_publish();
-    // publish runtime filter
-    void publish(HashJoinNode* hash_join_node);
-
-private:
-    const std::vector<ExprContext*>& _probe_expr_context;
-    const std::vector<ExprContext*>& _build_expr_context;
-    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
-    // prob_contition index -> [IRuntimeFilter]
-    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
-};
-
 } // namespace doris
 
 #endif
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
new file mode 100644
index 0000000..7f0957a
--- /dev/null
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/runtime_filter.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+
+namespace doris {
+
+// this class used in a hash join node
+// Provide a unified interface for other classes
+template <typename ExprCtxType>
+class RuntimeFilterSlotsBase {
+public:
+    RuntimeFilterSlotsBase(const std::vector<ExprCtxType*>& prob_expr_ctxs,
+                           const std::vector<ExprCtxType*>& build_expr_ctxs,
+                           const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
+            : _probe_expr_context(prob_expr_ctxs),
+              _build_expr_context(build_expr_ctxs),
+              _runtime_filter_descs(runtime_filter_descs) {}
+
+    Status init(RuntimeState* state, int64_t hash_table_size) {
+        DCHECK(_probe_expr_context.size() == _build_expr_context.size());
+
+        // runtime filter effect stragety
+        // 1. we will ignore IN filter when hash_table_size is too big
+        // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size
+        // is too small and IN filter has effect
+
+        std::map<int, bool> has_in_filter;
+
+        auto ignore_filter = [state](int filter_id) {
+            IRuntimeFilter* consumer_filter = nullptr;
+            state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
+            DCHECK(consumer_filter != nullptr);
+            consumer_filter->set_ignored();
+            consumer_filter->signal();
+        };
+
+        for (auto& filter_desc : _runtime_filter_descs) {
+            IRuntimeFilter* runtime_filter = nullptr;
+            RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
+                                                                             &runtime_filter));
+            DCHECK(runtime_filter != nullptr);
+            DCHECK(runtime_filter->expr_order() >= 0);
+            DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
+
+            // do not create 'in filter' when hash_table size over limit
+            bool over_max_in_num = (hash_table_size >= state->runtime_filter_max_in_num());
+
+            bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+
+            // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created
+            bool pass_not_in = (has_in_filter[runtime_filter->expr_order()] &&
+                                !runtime_filter->has_remote_target());
+
+            if (over_max_in_num == is_in_filter && (is_in_filter || pass_not_in)) {
+                ignore_filter(filter_desc.filter_id);
+                continue;
+            }
+
+            has_in_filter[runtime_filter->expr_order()] =
+                    (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+            _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+
+        return Status::OK();
+    }
+
+    void insert(TupleRow* row) {
+        for (int i = 0; i < _build_expr_context.size(); ++i) {
+            auto iter = _runtime_filters.find(i);
+            if (iter != _runtime_filters.end()) {
+                void* val = _build_expr_context[i]->get_value(row);
+                if (val != nullptr) {
+                    for (auto filter : iter->second) {
+                        filter->insert(val);
+                    }
+                }
+            }
+        }
+    }
+
+    // should call this method after insert
+    void ready_for_publish() {
+        for (auto& pair : _runtime_filters) {
+            for (auto filter : pair.second) {
+                filter->ready_for_publish();
+            }
+        }
+    }
+    // publish runtime filter
+    void publish() {
+        for (int i = 0; i < _probe_expr_context.size(); ++i) {
+            auto iter = _runtime_filters.find(i);
+            if (iter != _runtime_filters.end()) {
+                for (auto filter : iter->second) {
+                    filter->publish();
+                }
+            }
+        }
+        for (auto& pair : _runtime_filters) {
+            for (auto filter : pair.second) {
+                filter->publish_finally();
+            }
+        }
+    }
+
+    bool empty() { return !_runtime_filters.size(); }
+
+private:
+    const std::vector<ExprCtxType*>& _probe_expr_context;
+    const std::vector<ExprCtxType*>& _build_expr_context;
+    const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+    // prob_contition index -> [IRuntimeFilter]
+    std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
+};
+
+using RuntimeFilterSlots = RuntimeFilterSlotsBase<ExprContext>;
+
+} // namespace doris
diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h
index cdafb2c..36113a5 100644
--- a/be/src/runtime/primitive_type.h
+++ b/be/src/runtime/primitive_type.h
@@ -242,6 +242,10 @@ struct PrimitiveTypeTraits<TYPE_FLOAT> {
     using CppType = float;
 };
 template <>
+struct PrimitiveTypeTraits<TYPE_TIME> {
+    using CppType = double;
+};
+template <>
 struct PrimitiveTypeTraits<TYPE_DOUBLE> {
     using CppType = double;
 };

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org