You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/04 12:59:15 UTC
[incubator-doris] branch master updated: [Feature] Support
RuntimeFilter in Doris (BE Implement) (#6077)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 149def9 [Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)
149def9 is described below
commit 149def9e42e968ccc0f6e88c482d80ebb0b9a3b2
Author: stdpain <34...@users.noreply.github.com>
AuthorDate: Sun Jul 4 20:59:05 2021 +0800
[Feature] Support RuntimeFilter in Doris (BE Implement) (#6077)
1. support in/bloomfilter/minmax
2. support broadcast/shuffle/bucket shuffle/colocate join
3. opt memory use and cpu cache miss while build runtime filter
4. opt memory use in left semi join (works well on tpcds-95)
---
.clang-format | 1 +
be/src/common/config.h | 4 +
be/src/exec/hash_join_node.cpp | 144 +--
be/src/exec/hash_join_node.h | 14 +-
be/src/exec/hash_join_node_ir.cpp | 18 +-
be/src/exec/hash_table.cpp | 3 +
be/src/exec/hash_table.h | 30 +-
be/src/exec/hash_table.hpp | 73 ++
be/src/exec/olap_scan_node.cpp | 156 ++-
be/src/exec/olap_scan_node.h | 34 +-
be/src/exec/olap_scanner.cpp | 18 +-
be/src/exec/olap_scanner.h | 19 +-
be/src/exprs/CMakeLists.txt | 3 +
be/src/exprs/bloomfilter_predicate.cpp | 154 +++
be/src/exprs/bloomfilter_predicate.h | 287 +++++
be/src/exprs/expr_context.h | 4 +-
be/src/exprs/in_predicate.cpp | 8 +-
be/src/exprs/in_predicate.h | 3 +-
be/src/exprs/literal.h | 2 +
be/src/exprs/runtime_filter.cpp | 1138 ++++++++++++++++++++
be/src/exprs/runtime_filter.h | 324 ++++++
be/src/exprs/runtime_filter_rpc.cpp | 93 ++
be/src/olap/CMakeLists.txt | 1 +
be/src/olap/bloom_filter_predicate.cpp | 67 ++
be/src/olap/bloom_filter_predicate.h | 59 +
be/src/olap/in_list_predicate.cpp | 139 +--
be/src/olap/in_list_predicate.h | 80 +-
be/src/olap/reader.cpp | 92 +-
be/src/olap/reader.h | 31 +-
be/src/olap/rowset/segment_v2/bloom_filter.cpp | 2 +-
be/src/olap/rowset/segment_v2/bloom_filter.h | 21 +-
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/decimal_value.h | 1 +
be/src/runtime/fragment_mgr.cpp | 44 +
be/src/runtime/fragment_mgr.h | 10 +
be/src/runtime/runtime_filter_mgr.cpp | 304 ++++++
be/src/runtime/runtime_filter_mgr.h | 179 +++
be/src/runtime/runtime_state.cpp | 12 +-
be/src/runtime/runtime_state.h | 10 +
be/src/service/internal_service.cpp | 44 +-
be/src/service/internal_service.h | 9 +
be/test/exec/hash_table_test.cpp | 8 +-
be/test/exprs/CMakeLists.txt | 3 +-
be/test/exprs/bloom_filter_predicate_test.cpp | 105 ++
be/test/exprs/runtime_filter_test.cpp | 173 +++
be/test/olap/CMakeLists.txt | 1 +
.../olap/bloom_filter_column_predicate_test.cpp | 190 ++++
be/test/olap/in_list_predicate_test.cpp | 18 +-
be/test/olap/rowset/segment_v2/segment_test.cpp | 6 +-
gensrc/proto/internal_service.proto | 71 ++
gensrc/proto/palo_internal_service.proto | 2 +
gensrc/thrift/Exprs.thrift | 3 +
gensrc/thrift/Opcodes.thrift | 1 +
gensrc/thrift/PaloInternalService.thrift | 47 +
gensrc/thrift/PlanNodes.thrift | 43 +
gensrc/thrift/Types.thrift | 15 +
56 files changed, 4039 insertions(+), 283 deletions(-)
diff --git a/.clang-format b/.clang-format
index 0f9b9ba..3b8b570 100644
--- a/.clang-format
+++ b/.clang-format
@@ -13,3 +13,4 @@ PointerAlignment: Left
ReflowComments: false
SortUsingDeclarations: false
SpacesBeforeTrailingComments: 1
+SpaceBeforeCpp11BracedList: true
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 575659a..bec0b9e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -614,6 +614,10 @@ CONF_Int16(mem_tracker_level, "0");
// This config usually only needs to be modified during testing.
// In most cases, it does not need to be modified.
CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
+
+// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
+// else we will call sync method
+CONF_mBool(runtime_filter_use_async_rpc, "true");
} // namespace config
} // namespace doris
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index ce22f3e..fff59ef 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -14,19 +14,22 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
#include "exec/hash_join_node.h"
+#include <memory>
#include <sstream>
#include "exec/hash_table.hpp"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/in_predicate.h"
+#include "exprs/runtime_filter.h"
#include "exprs/slot_ref.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
+#include "util/defer_op.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -36,7 +39,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_join_op(tnode.hash_join_node.join_op),
_probe_counter(0),
_probe_eos(false),
- _process_build_batch_fn(NULL),
_process_probe_batch_fn(NULL),
_anti_join_last_pos(NULL) {
_match_all_probe =
@@ -44,8 +46,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_match_one_build = (_join_op == TJoinOp::LEFT_SEMI_JOIN);
_match_all_build =
(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
- _is_push_down = tnode.hash_join_node.is_push_down;
_build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN;
+
+ _runtime_filter_descs = tnode.runtime_filters;
}
HashJoinNode::~HashJoinNode() {
@@ -81,6 +84,11 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
_build_unique = false;
}
+ for (const auto& filter_desc : _runtime_filter_descs) {
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::PRODUCER,
+ filter_desc));
+ }
+
return Status::OK();
}
@@ -97,7 +105,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
_hash_tbl_load_factor_counter =
ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
+ _hash_table_list_min_size = ADD_COUNTER(runtime_profile(), "HashTableMinList", TUnit::UNIT);
+ _hash_table_list_max_size = ADD_COUNTER(runtime_profile(), "HashTableMaxList", TUnit::UNIT);
// build and probe exprs are evaluated in the context of the rows produced by our
// right and left children, respectively
RETURN_IF_ERROR(
@@ -186,21 +195,22 @@ Status HashJoinNode::construct_hash_table(RuntimeState* state) {
RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker().get());
RETURN_IF_ERROR(child(1)->open(state));
+ SCOPED_TIMER(_build_timer);
+ Defer defer {[&] {
+ COUNTER_SET(_build_rows_counter, _hash_tbl->size());
+ COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
+ COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
+ auto node = _hash_tbl->minmax_node();
+ COUNTER_SET(_hash_table_list_min_size, node.first);
+ COUNTER_SET(_hash_table_list_max_size, node.second);
+ }};
while (true) {
RETURN_IF_CANCELLED(state);
bool eos = true;
RETURN_IF_ERROR(child(1)->get_next(state, &build_batch, &eos));
- SCOPED_TIMER(_build_timer);
- // take ownership of tuple data of build_batch
- _build_pool->acquire_data(build_batch.tuple_data_pool(), false);
- RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
- process_build_batch(&build_batch);
-
+ RETURN_IF_ERROR(process_build_batch(state, &build_batch));
VLOG_ROW << _hash_tbl->debug_string(true, &child(1)->row_desc());
- COUNTER_SET(_build_rows_counter, _hash_tbl->size());
- COUNTER_SET(_build_buckets_counter, _hash_tbl->num_buckets());
- COUNTER_SET(_hash_tbl_load_factor_counter, _hash_tbl->load_factor());
build_batch.reset();
if (eos) {
@@ -236,91 +246,23 @@ Status HashJoinNode::open(RuntimeState* state) {
thread_status.set_value(construct_hash_table(state));
}
- if (_children[0]->type() == TPlanNodeType::EXCHANGE_NODE &&
- _children[1]->type() == TPlanNodeType::EXCHANGE_NODE) {
- _is_push_down = false;
- }
-
- // The predicate could not be pushed down when there is Null-safe equal operator.
- // The in predicate will filter the null value in child[0] while it is needed in the Null-safe equal join.
- // For example: select * from a join b where a.id<=>b.id
- // the null value in table a should be return by scan node instead of filtering it by In-predicate.
- if (std::find(_is_null_safe_eq_join.begin(), _is_null_safe_eq_join.end(), true) !=
- _is_null_safe_eq_join.end()) {
- _is_push_down = false;
- }
+ if (!_runtime_filter_descs.empty()) {
+ RuntimeFilterSlots runtime_filter_slots(_probe_expr_ctxs, _build_expr_ctxs,
+ _runtime_filter_descs);
- if (_is_push_down) {
- // Blocks until ConstructHashTable has returned, after which
- // the hash table is fully constructed and we can start the probe
- // phase.
RETURN_IF_ERROR(thread_status.get_future().get());
-
- if (_hash_tbl->size() == 0 && _join_op == TJoinOp::INNER_JOIN) {
- // Hash table size is zero
- LOG(INFO) << "No element need to push down, no need to read probe table";
- RETURN_IF_ERROR(child(0)->open(state));
- _probe_batch_pos = 0;
- _hash_tbl_iterator = _hash_tbl->begin();
- _eos = true;
- return Status::OK();
- }
-
- if (_hash_tbl->size() > 1024) {
- _is_push_down = false;
+ RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(),
+ _hash_tbl->size()));
+ {
+ SCOPED_TIMER(_push_compute_timer);
+ auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); };
+ _hash_tbl->for_each_row(func);
}
-
- // TODO: this is used for Code Check, Remove this later
- if (_is_push_down || 0 != child(1)->conjunct_ctxs().size()) {
- for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
- TExprNode node;
- node.__set_node_type(TExprNodeType::IN_PRED);
- TScalarType tscalar_type;
- tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
- TTypeNode ttype_node;
- ttype_node.__set_type(TTypeNodeType::SCALAR);
- ttype_node.__set_scalar_type(tscalar_type);
- TTypeDesc t_type_desc;
- t_type_desc.types.push_back(ttype_node);
- node.__set_type(t_type_desc);
- node.in_predicate.__set_is_not_in(false);
- node.__set_opcode(TExprOpcode::FILTER_IN);
- node.__isset.vector_opcode = true;
- node.__set_vector_opcode(to_in_opcode(_probe_expr_ctxs[i]->root()->type().type));
- // NOTE(zc): in predicate only used here, no need prepare.
- InPredicate* in_pred = _pool->add(new InPredicate(node));
- RETURN_IF_ERROR(in_pred->prepare(state, _probe_expr_ctxs[i]->root()->type()));
- in_pred->add_child(Expr::copy(_pool, _probe_expr_ctxs[i]->root()));
- ExprContext* ctx = _pool->add(new ExprContext(in_pred));
- _push_down_expr_ctxs.push_back(ctx);
- }
-
- {
- SCOPED_TIMER(_push_compute_timer);
- HashTable::Iterator iter = _hash_tbl->begin();
-
- while (iter.has_next()) {
- TupleRow* row = iter.get_row();
- std::list<ExprContext*>::iterator ctx_iter = _push_down_expr_ctxs.begin();
-
- for (int i = 0; i < _build_expr_ctxs.size(); ++i, ++ctx_iter) {
- void* val = _build_expr_ctxs[i]->get_value(row);
- InPredicate* in_pre = (InPredicate*)((*ctx_iter)->root());
- in_pre->insert(val);
- }
-
- SCOPED_TIMER(_build_timer);
- iter.next<false>();
- }
- }
-
+ COUNTER_UPDATE(_build_timer, _push_compute_timer->value());
+ {
SCOPED_TIMER(_push_down_timer);
- push_down_predicate(state, &_push_down_expr_ctxs);
+ runtime_filter_slots.publish(this);
}
-
- // Open the probe-side child so that it may perform any initialisation in parallel.
- // Don't exit even if we see an error, we still need to wait for the build thread
- // to finish.
Status open_status = child(0)->open(state);
RETURN_IF_ERROR(open_status);
} else {
@@ -370,8 +312,8 @@ Status HashJoinNode::open(RuntimeState* state) {
Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
- // In most cases, no additional memory overhead will be applied for at this stage,
- // but if the expression calculation in this node needs to apply for additional memory,
+ // In most cases, no additional memory overhead will be applied for at this stage,
+ // but if the expression calculation in this node needs to apply for additional memory,
// it may cause the memory to exceed the limit.
RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -618,6 +560,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
*eos = _eos;
ScopedTimer<MonotonicStopWatch> probe_timer(_probe_timer);
+ Defer defer {[&] { COUNTER_SET(_rows_returned_counter, _num_rows_returned); }};
while (!_eos) {
// Compute max rows that should be added to out_batch
@@ -628,16 +571,7 @@ Status HashJoinNode::left_join_get_next(RuntimeState* state, RowBatch* out_batch
}
// Continue processing this row batch
- if (_process_probe_batch_fn == NULL) {
- _num_rows_returned +=
- process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- } else {
- // Use codegen'd function
- _num_rows_returned +=
- _process_probe_batch_fn(this, out_batch, _probe_batch.get(), max_added_rows);
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- }
+ _num_rows_returned += process_probe_batch(out_batch, _probe_batch.get(), max_added_rows);
if (reached_limit() || out_batch->is_full()) {
*eos = reached_limit();
diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h
index bfe2b36..4e1ea92 100644
--- a/be/src/exec/hash_join_node.h
+++ b/be/src/exec/hash_join_node.h
@@ -62,9 +62,10 @@ protected:
void debug_string(int indentation_level, std::stringstream* out) const;
private:
+ friend class IRuntimeFilter;
+
boost::scoped_ptr<HashTable> _hash_tbl;
HashTable::Iterator _hash_tbl_iterator;
- bool _is_push_down;
// for right outer joins, keep track of what's been joined
typedef std::unordered_set<TupleRow*> BuildTupleRowSet;
@@ -117,11 +118,6 @@ private:
// This should be the same size as the probe tuple row.
int _result_tuple_row_size;
- // Function declaration for codegen'd function. Signature must match
- // HashJoinNode::ProcessBuildBatch
- typedef void (*ProcessBuildBatchFn)(HashJoinNode*, RowBatch*);
- ProcessBuildBatchFn _process_build_batch_fn;
-
// HashJoinNode::process_probe_batch() exactly
typedef int (*ProcessProbeBatchFn)(HashJoinNode*, RowBatch*, RowBatch*, int);
// Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
@@ -138,6 +134,8 @@ private:
RuntimeProfile::Counter* _probe_rows_counter; // num probe rows
RuntimeProfile::Counter* _build_buckets_counter; // num buckets in hash table
RuntimeProfile::Counter* _hash_tbl_load_factor_counter;
+ RuntimeProfile::Counter* _hash_table_list_min_size;
+ RuntimeProfile::Counter* _hash_table_list_max_size;
// Supervises ConstructHashTable in a separate thread, and
// returns its status in the promise parameter.
@@ -162,7 +160,7 @@ private:
int process_probe_batch(RowBatch* out_batch, RowBatch* probe_batch, int max_added_rows);
// Construct the build hash table, adding all the rows in 'build_batch'
- void process_build_batch(RowBatch* build_batch);
+ Status process_build_batch(RuntimeState* state, RowBatch* build_batch);
// Write combined row, consisting of probe_row and build_row, to out_row.
// This is replaced by codegen.
@@ -176,6 +174,8 @@ private:
// doing the join.
std::string get_probe_row_output_string(TupleRow* probe_row);
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+
// RELEASE_CONTEXT_COUNTER should be power of 2
// GCC will optimize the modulo operation to &(release_context_counter - 1)
// build_expr_context and probe_expr_context will free local alloc after this probe calculations
diff --git a/be/src/exec/hash_join_node_ir.cpp b/be/src/exec/hash_join_node_ir.cpp
index 3bd7f42..7f885e7 100644
--- a/be/src/exec/hash_join_node_ir.cpp
+++ b/be/src/exec/hash_join_node_ir.cpp
@@ -19,6 +19,8 @@
#include "exec/hash_table.hpp"
#include "exprs/expr_context.h"
#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/tuple_row.h"
namespace doris {
@@ -141,17 +143,29 @@ end:
// when build table has too many duplicated rows, the collisions will be very serious,
// so in some case will don't need to store duplicated value in hash table, we can build an unique one
-void HashJoinNode::process_build_batch(RowBatch* build_batch) {
+Status HashJoinNode::process_build_batch(RuntimeState* state, RowBatch* build_batch) {
// insert build row into our hash table
if (_build_unique) {
for (int i = 0; i < build_batch->num_rows(); ++i) {
- _hash_tbl->insert_unique(build_batch->get_row(i));
+ // _hash_tbl->insert_unique(build_batch->get_row(i));
+ TupleRow* tuple_row = nullptr;
+ if (_hash_tbl->emplace_key(build_batch->get_row(i), &tuple_row)) {
+ build_batch->get_row(i)->deep_copy(tuple_row,
+ child(1)->row_desc().tuple_descriptors(),
+ _build_pool.get(), false);
+ }
}
+ RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
} else {
+ // take ownership of tuple data of build_batch
+ _build_pool->acquire_data(build_batch->tuple_data_pool(), false);
+ RETURN_IF_LIMIT_EXCEEDED(state, "Hash join, while constructing the hash table.");
+
for (int i = 0; i < build_batch->num_rows(); ++i) {
_hash_tbl->insert(build_batch->get_row(i));
}
}
+ return Status::OK();
}
} // namespace doris
diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp
index 5a17935..8b2b1df 100644
--- a/be/src/exec/hash_table.cpp
+++ b/be/src/exec/hash_table.cpp
@@ -66,10 +66,12 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
_expr_value_null_bits = new uint8_t[_build_expr_ctxs.size()];
_alloc_list.reserve(10);
+ _end_list.reserve(10);
_current_nodes = reinterpret_cast<uint8_t*>(malloc(_current_capacity * _node_byte_size));
// TODO: remove memset later
memset(_current_nodes, 0, _current_capacity * _node_byte_size);
_alloc_list.push_back(_current_nodes);
+ _end_list.push_back(_current_nodes + _current_capacity * _node_byte_size);
_mem_tracker->Consume(_current_capacity * _node_byte_size);
if (_mem_tracker->limit_exceeded()) {
@@ -249,6 +251,7 @@ void HashTable::grow_node_array() {
memset(_current_nodes, 0, alloc_size);
// add _current_nodes to alloc pool
_alloc_list.push_back(_current_nodes);
+ _end_list.push_back(_current_nodes + alloc_size);
_mem_tracker->Consume(alloc_size);
if (_mem_tracker->limit_exceeded()) {
diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h
index caaf01b..0560fc8 100644
--- a/be/src/exec/hash_table.h
+++ b/be/src/exec/hash_table.h
@@ -118,6 +118,8 @@ public:
}
}
+ bool IR_ALWAYS_INLINE emplace_key(TupleRow* row, TupleRow** key_addr);
+
// Returns the start iterator for all rows that match 'probe_row'. 'probe_row' is
// evaluated with _probe_expr_ctxs. The iterator can be iterated until HashTable::end()
// to find all the matching rows.
@@ -170,6 +172,8 @@ public:
// just the build row addresses.
std::string debug_string(bool skip_empty, const RowDescriptor* build_desc);
+ inline std::pair<int64_t, int64_t> minmax_node();
+
// stl-like iterator interface.
class Iterator {
public:
@@ -233,6 +237,27 @@ public:
uint32_t _scan_hash;
};
+ template <class Func>
+ void for_each_row(Func&& func) {
+ size_t sz = _alloc_list.size();
+ DCHECK_GT(sz, 0);
+ for (size_t i = 0; i < sz - 1; ++i) {
+ uint8_t* start = _alloc_list[i];
+ uint8_t* end = _end_list[i];
+ while (start < end) {
+ auto node = reinterpret_cast<Node*>(start);
+ func(node->data());
+ start += _node_byte_size;
+ }
+ }
+ uint8_t* last_st = _alloc_list[sz - 1];
+ for (size_t i = 0; i < _current_used; ++i) {
+ auto node = reinterpret_cast<Node*>(last_st);
+ func(node->data());
+ last_st += _node_byte_size;
+ }
+ }
+
private:
friend class Iterator;
friend class HashTableTest;
@@ -254,8 +279,9 @@ private:
};
struct Bucket {
- Bucket() { _node = nullptr; }
+ Bucket() : _node(nullptr), _size(0) {}
Node* _node;
+ uint64_t _size;
};
// Returns the next non-empty bucket and updates idx to be the index of that bucket.
@@ -389,6 +415,8 @@ private:
uint8_t* _expr_value_null_bits;
// node buffer list
std::vector<uint8_t*> _alloc_list;
+ // node buffer end pointer
+ std::vector<uint8_t*> _end_list;
};
} // namespace doris
diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp
index ae2710d..2e655ff 100644
--- a/be/src/exec/hash_table.hpp
+++ b/be/src/exec/hash_table.hpp
@@ -22,6 +22,59 @@
namespace doris {
+inline bool HashTable::emplace_key(TupleRow* row, TupleRow** dest_addr) {
+ bool has_nulls = eval_build_row(row);
+
+ if (!_stores_nulls && has_nulls) {
+ return false;
+ }
+
+ uint32_t hash = hash_current_row();
+ int64_t bucket_idx = hash & (_num_buckets - 1);
+
+ Bucket* bucket = &_buckets[bucket_idx];
+ Node* node = bucket->_node;
+
+ bool will_insert = true;
+
+ if (node == nullptr) {
+ will_insert = true;
+ } else {
+ Node* last_node = node;
+ while (node != nullptr) {
+ if (node->_hash == hash && equals(node->data())) {
+ will_insert = false;
+ break;
+ }
+ last_node = node;
+ node = node->_next;
+ }
+ node = last_node;
+ }
+ if (will_insert) {
+ if (_num_filled_buckets > _num_buckets_till_resize) {
+ resize_buckets(_num_buckets * 2);
+ // real bucket_id will modify after resize buckets
+ bucket_idx = hash & (_num_buckets - 1);
+ }
+ if (_current_used == _current_capacity) {
+ grow_node_array();
+ }
+ Node* alloc_node =
+ reinterpret_cast<Node*>(_current_nodes + _node_byte_size * _current_used++);
+ ++_num_nodes;
+ TupleRow* data = alloc_node->data();
+ *dest_addr = data;
+ alloc_node->_hash = hash;
+ if (node == nullptr) {
+ add_to_bucket(&_buckets[bucket_idx], alloc_node);
+ } else {
+ node->_next = alloc_node;
+ }
+ }
+ return will_insert;
+}
+
inline HashTable::Iterator HashTable::find(TupleRow* probe_row, bool probe) {
bool has_nulls = probe ? eval_probe_row(probe_row) : eval_build_row(probe_row);
@@ -100,11 +153,13 @@ inline void HashTable::add_to_bucket(Bucket* bucket, Node* node) {
node->_next = bucket->_node;
bucket->_node = node;
+ bucket->_size++;
}
inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node,
Node* previous_node) {
Node* next_node = node->_next;
+ from_bucket->_size--;
if (previous_node != NULL) {
previous_node->_next = next_node;
@@ -120,6 +175,24 @@ inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* n
add_to_bucket(to_bucket, node);
}
+inline std::pair<int64_t, int64_t> HashTable::minmax_node() {
+ bool has_value = false;
+ int64_t min_size = std::numeric_limits<int64_t>::max();
+ int64_t max_size = std::numeric_limits<int64_t>::min();
+ for (const auto bucket : _buckets) {
+ int64_t counter = bucket._size;
+ if (counter > 0) {
+ has_value = true;
+ min_size = std::min(counter, min_size);
+ max_size = std::max(counter, max_size);
+ }
+ }
+ if (!has_value) {
+ return std::make_pair(0, 0);
+ }
+ return std::make_pair(min_size, max_size);
+}
+
template <bool check_match>
inline void HashTable::Iterator::next() {
if (_bucket_idx == -1) {
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index f9ecb2a..409e27f 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -30,9 +30,11 @@
#include "exprs/binary_predicate.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
+#include "exprs/runtime_filter.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
@@ -59,7 +61,8 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_status(Status::OK()),
_resource_info(nullptr),
_buffered_bytes(0),
- _eval_conjuncts_fn(nullptr) {}
+ _eval_conjuncts_fn(nullptr),
+ _runtime_filter_descs(tnode.runtime_filters) {}
OlapScanNode::~OlapScanNode() {}
@@ -80,6 +83,20 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}
+ /// TODO: could one filter used in the different scan_node ?
+ int filter_size = _runtime_filter_descs.size();
+ _runtime_filter_ctxs.resize(filter_size);
+ for (int i = 0; i < filter_size; ++i) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ const auto& filter_desc = _runtime_filter_descs[i];
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::CONSUMER,
+ filter_desc, id()));
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+ &runtime_filter));
+
+ _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+ }
+
return Status::OK();
}
@@ -173,6 +190,13 @@ Status OlapScanNode::prepare(RuntimeState* state) {
}
_runtime_state = state;
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
+ &runtime_filter);
+ DCHECK(runtime_filter != nullptr);
+ runtime_filter->init_profile(_runtime_profile.get());
+ }
return Status::OK();
}
@@ -184,6 +208,38 @@ Status OlapScanNode::open(RuntimeState* state) {
_resource_info = ResourceTls::get_resource_tls();
+ // acquire runtime filter
+ _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
+
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ auto& filter_desc = _runtime_filter_descs[i];
+ IRuntimeFilter* runtime_filter = nullptr;
+ state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
+ DCHECK(runtime_filter != nullptr);
+ if (runtime_filter == nullptr) {
+ continue;
+ }
+ bool ready = runtime_filter->is_ready();
+ if (!ready) {
+ ready = runtime_filter->await();
+ }
+ if (ready) {
+ std::list<ExprContext*> expr_context;
+ RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
+ _runtime_filter_ctxs[i].apply_mark = true;
+ _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+
+ for (auto ctx : expr_context) {
+ ctx->prepare(state, row_desc(), _expr_mem_tracker);
+ ctx->open(state);
+ int index = _conjunct_ctxs.size();
+ _conjunct_ctxs.push_back(ctx);
+ // it's safe to store address from a fix-resized vector
+ _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
+ }
+ }
+ }
+
return Status::OK();
}
@@ -336,6 +392,13 @@ Status OlapScanNode::close(RuntimeState* state) {
scanner->close(state);
}
+ for (auto& filter_desc : _runtime_filter_descs) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
+ DCHECK(runtime_filter != nullptr);
+ runtime_filter->consumer_close();
+ }
+
VLOG_CRITICAL << "OlapScanNode::close()";
return ScanNode::close(state);
}
@@ -372,7 +435,7 @@ Status OlapScanNode::start_scan(RuntimeState* state) {
eval_const_conjuncts();
VLOG_CRITICAL << "NormalizeConjuncts";
- // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts will
+ // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts may
// set eos = true
RETURN_IF_ERROR(normalize_conjuncts());
@@ -429,7 +492,7 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
}
auto new_direct_conjunct_size = new_conjunct_ctxs.size();
- // dispose hash push down conjunct second
+ // dispose hash join push down conjunct second
for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
@@ -441,6 +504,13 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
_conjunct_ctxs = std::move(new_conjunct_ctxs);
_direct_conjunct_size = new_direct_conjunct_size;
+
+ for (auto push_down_ctx : _pushed_conjuncts_index) {
+ auto iter = _conjunctid_to_runtime_filter_ctxs.find(push_down_ctx);
+ if (iter != _conjunctid_to_runtime_filter_ctxs.end()) {
+ iter->second->runtimefilter->set_push_down_profile();
+ }
+ }
}
void OlapScanNode::eval_const_conjuncts() {
@@ -716,7 +786,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool->add(scanner);
- RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter));
+ RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
+ _bloom_filters_push_down));
_olap_scanners.push_back(scanner);
disk_set.insert(scanner->scan_disk());
@@ -725,6 +796,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size()));
COUNTER_SET(_num_scanners, static_cast<int64_t>(_olap_scanners.size()));
+ // PAIN_LOG(_olap_scanners.size());
// init progress
std::stringstream ss;
ss << "ScanThread complete (node=" << id() << "):";
@@ -746,6 +818,9 @@ Status OlapScanNode::normalize_predicate(ColumnValueRange<T>& range, SlotDescrip
// 3. Normalize BinaryPredicate , add to ColumnValueRange
RETURN_IF_ERROR(normalize_noneq_binary_predicate(slot, &range));
+ // 3. Normalize BloomFilterPredicate, push down by hash join node
+ RETURN_IF_ERROR(normalize_bloom_filter_predicate(slot));
+
// 4. Check whether range is empty, set _eos
if (range.is_empty_value_range()) _eos = true;
@@ -767,6 +842,11 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) {
bool OlapScanNode::should_push_down_in_predicate(doris::SlotDescriptor* slot,
doris::InPredicate* pred) {
+ if (pred->is_not_in()) {
+ // can not push down NOT IN predicate to storage engine
+ return false;
+ }
+
if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
@@ -1145,7 +1225,6 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
}
switch (slot->type().type) {
-
case TYPE_DATE: {
DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
// NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value
@@ -1201,6 +1280,47 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot,
return Status::OK();
}
+Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
+ std::vector<uint32_t> filter_conjuncts_index;
+
+ for (int conj_idx = _direct_conjunct_size; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
+ Expr* root_expr = _conjunct_ctxs[conj_idx]->root();
+ if (TExprNodeType::BLOOM_PRED != root_expr->node_type()) continue;
+
+ Expr* pred = _conjunct_ctxs[conj_idx]->root();
+ DCHECK(pred->get_num_children() == 1);
+
+ if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
+ continue;
+ }
+ if (pred->get_child(0)->type().type != slot->type().type) {
+ if (!ignore_cast(slot, pred->get_child(0))) {
+ continue;
+ }
+ }
+
+ std::vector<SlotId> slot_ids;
+
+ if (1 == pred->get_child(0)->get_slot_ids(&slot_ids)) {
+ if (slot_ids[0] != slot->id()) {
+ continue;
+ }
+ // only key column of bloom filter will push down to storage engine
+ if (is_key_column(slot->col_name())) {
+ filter_conjuncts_index.emplace_back(conj_idx);
+ _bloom_filters_push_down.emplace_back(
+ slot->col_name(),
+ (reinterpret_cast<BloomFilterPredicate*>(pred))->get_bloom_filter_func());
+ }
+ }
+ }
+
+ std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(),
+ std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin()));
+
+ return Status::OK();
+}
+
void OlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
state->resource_pool()->acquire_thread_token();
@@ -1386,6 +1506,32 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
scanner->set_opened();
}
+ std::vector<ExprContext*> contexts;
+ auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
+ DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
+ for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
+ if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
+ &runtime_filter);
+ DCHECK(runtime_filter != nullptr);
+ bool ready = runtime_filter->is_ready();
+ if (ready) {
+ runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+ _runtime_filter_ctxs[i].apply_mark = true;
+ }
+ }
+ }
+
+ if (!contexts.empty()) {
+ std::vector<ExprContext*> new_contexts;
+ auto& scanner_conjunct_ctxs = *scanner->conjunct_ctxs();
+ Expr::clone_if_not_exists(contexts, state, &new_contexts);
+ scanner_conjunct_ctxs.insert(scanner_conjunct_ctxs.end(), new_contexts.begin(),
+ new_contexts.end());
+ scanner->set_use_pushdown_conjuncts(true);
+ }
+
// apply to cgroup
if (_resource_info != nullptr) {
CgroupsMgr::apply_cgroup(_resource_info->user, _resource_info->group);
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index f0c6695..2acb84f 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -27,6 +27,7 @@
#include "exec/olap_common.h"
#include "exec/olap_scanner.h"
#include "exec/scan_node.h"
+#include "exprs/bloomfilter_predicate.h"
#include "exprs/in_predicate.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch_interface.hpp"
@@ -35,6 +36,7 @@
#include "util/spinlock.h"
namespace doris {
+class IRuntimeFilter;
enum TransferStatus {
READ_ROWBATCH = 1,
@@ -138,7 +140,7 @@ protected:
// In order to ensure the accuracy of the query result
// only key column conjuncts will be remove as idle conjunct
bool is_key_column(const std::string& key_name);
- void remove_pushed_conjuncts(RuntimeState *state);
+ void remove_pushed_conjuncts(RuntimeState* state);
Status start_scan(RuntimeState* state);
@@ -160,9 +162,12 @@ protected:
template <class T>
Status normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange<T>* range);
+ Status normalize_bloom_filter_predicate(SlotDescriptor* slot);
+
template <typename T>
static bool normalize_is_null_predicate(Expr* expr, SlotDescriptor* slot,
- const std::string& is_null_str, ColumnValueRange<T>* range);
+ const std::string& is_null_str,
+ ColumnValueRange<T>* range);
void transfer_thread(RuntimeState* state);
void scanner_thread(OlapScanner* scanner);
@@ -172,6 +177,8 @@ protected:
// Write debug string of this into out.
virtual void debug_string(int indentation_level, std::stringstream* out) const;
+ const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return _runtime_filter_descs; }
+
private:
void _init_counter(RuntimeState* state);
// OLAP_SCAN_NODE profile layering: OLAP_SCAN_NODE, OlapScanner, and SegmentIterator
@@ -180,11 +187,12 @@ private:
bool should_push_down_in_predicate(SlotDescriptor* slot, InPredicate* in_pred);
- std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred, int conj_idx, int child_idx);
-
template <typename T, typename ChangeFixedValueRangeFunc>
- static Status change_fixed_value_range(ColumnValueRange <T> &range, PrimitiveType type, void *value,
- const ChangeFixedValueRangeFunc& func);
+ static Status change_fixed_value_range(ColumnValueRange<T>& range, PrimitiveType type,
+ void* value, const ChangeFixedValueRangeFunc& func);
+
+ std::pair<bool, void*> should_push_down_eq_predicate(SlotDescriptor* slot, Expr* pred,
+ int conj_idx, int child_idx);
friend class OlapScanner;
@@ -212,6 +220,11 @@ private:
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
std::vector<TCondition> _olap_filter;
+ // push down bloom filters to storage engine.
+ // 1. std::pair.first :: column name
+ // 2. std::pair.second :: shared_ptr of BloomFilterFuncBase
+ std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
+ _bloom_filters_push_down;
// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
@@ -283,6 +296,15 @@ private:
// or be overwritten by value in TQueryOptions
int32_t _max_pushdown_conditions_per_column = 1024;
+ struct RuntimeFilterContext {
+ RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {}
+ bool apply_mark;
+ IRuntimeFilter* runtimefilter;
+ };
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+ std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+ std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
+
std::unique_ptr<RuntimeProfile> _scanner_profile;
std::unique_ptr<RuntimeProfile> _segment_profile;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e97d3ce..9b2b9fb 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -60,9 +60,10 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
OlapScanner::~OlapScanner() {}
-Status OlapScanner::prepare(const TPaloScanRange& scan_range,
- const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<TCondition>& filters) {
+Status OlapScanner::prepare(
+ const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<TCondition>& filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
// Get olap table
TTabletId tablet_id = scan_range.tablet_id;
SchemaHash schema_hash = strtoul(scan_range.schema_hash.c_str(), nullptr, 10);
@@ -107,7 +108,7 @@ Status OlapScanner::prepare(const TPaloScanRange& scan_range,
{
// Initialize _params
- RETURN_IF_ERROR(_init_params(key_ranges, filters));
+ RETURN_IF_ERROR(_init_params(key_ranges, filters, bloom_filters));
}
return Status::OK();
@@ -120,6 +121,8 @@ Status OlapScanner::open() {
_use_pushdown_conjuncts = true;
}
+ _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
+
auto res = _reader->init(_params);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to init reader.[res=%d]", res);
@@ -132,8 +135,9 @@ Status OlapScanner::open() {
}
// it will be called under tablet read lock because capture rs readers need
-Status OlapScanner::_init_params(const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<TCondition>& filters) {
+Status OlapScanner::_init_params(
+ const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
RETURN_IF_ERROR(_init_return_columns());
_params.tablet = _tablet;
@@ -145,6 +149,8 @@ Status OlapScanner::_init_params(const std::vector<OlapScanRange*>& key_ranges,
for (auto& filter : filters) {
_params.conditions.push_back(filter);
}
+ std::copy(bloom_filters.cbegin(), bloom_filters.cend(),
+ std::inserter(_params.bloom_filters, _params.bloom_filters.begin()));
// Range
for (auto key_range : key_ranges) {
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 15cf13b..2c09578 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "exec/exec_node.h"
#include "exec/olap_utils.h"
+#include "exprs/bloomfilter_predicate.h"
#include "exprs/expr.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
@@ -55,7 +56,9 @@ public:
~OlapScanner();
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<TCondition>& filters);
+ const std::vector<TCondition>& filters,
+ const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>&
+ bloom_filters);
Status open();
@@ -83,14 +86,19 @@ public:
_watcher.start();
}
- int64_t update_wait_worker_timer() {
- return _watcher.elapsed_time();
+ int64_t update_wait_worker_timer() { return _watcher.elapsed_time(); }
+
+ void set_use_pushdown_conjuncts(bool has_pushdown_conjuncts) {
+ _use_pushdown_conjuncts = has_pushdown_conjuncts;
}
+ std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; }
private:
Status _init_params(const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<TCondition>& filters);
+ const std::vector<TCondition>& filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
+ bloom_filters);
Status _init_return_columns();
void _convert_row_to_tuple(Tuple* tuple);
@@ -98,7 +106,6 @@ private:
void _update_realtime_counter();
private:
-
RuntimeState* _runtime_state;
OlapScanNode* _parent;
const TupleDescriptor* _tuple_desc; /**< tuple descriptor */
@@ -106,6 +113,8 @@ private:
const std::vector<SlotDescriptor*>& _string_slots;
std::vector<ExprContext*> _conjunct_ctxs;
+ // to record which runtime filters have been used
+ std::vector<bool> _runtime_filter_marks;
int _id;
bool _is_open;
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index bc179e5..f64b5dd 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -45,6 +45,9 @@ add_library(Exprs
expr_context.cpp
in_predicate.cpp
new_in_predicate.cpp
+ bloomfilter_predicate.cpp
+ runtime_filter.cpp
+ runtime_filter_rpc.cpp
is_null_predicate.cpp
like_predicate.cpp
math_functions.cpp
diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp
new file mode 100644
index 0000000..34c5127
--- /dev/null
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/bloomfilter_predicate.h"
+
+#include <sstream>
+
+#include "exprs/anyval_util.h"
+#include "runtime/raw_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.hpp"
+
+namespace doris {
+
+BloomFilterFuncBase* BloomFilterFuncBase::create_bloom_filter(MemTracker* tracker,
+ PrimitiveType type) {
+ switch (type) {
+ case TYPE_BOOLEAN:
+ return new (std::nothrow) BloomFilterFunc<bool>(tracker);
+
+ case TYPE_TINYINT:
+ return new (std::nothrow) BloomFilterFunc<int8_t>(tracker);
+
+ case TYPE_SMALLINT:
+ return new (std::nothrow) BloomFilterFunc<int16_t>(tracker);
+
+ case TYPE_INT:
+ return new (std::nothrow) BloomFilterFunc<int32_t>(tracker);
+
+ case TYPE_BIGINT:
+ return new (std::nothrow) BloomFilterFunc<int64_t>(tracker);
+
+ case TYPE_FLOAT:
+ return new (std::nothrow) BloomFilterFunc<float>(tracker);
+
+ case TYPE_DOUBLE:
+ return new (std::nothrow) BloomFilterFunc<double>(tracker);
+
+ case TYPE_DATE:
+ return new (std::nothrow) DateBloomFilterFunc(tracker);
+
+ case TYPE_DATETIME:
+ return new (std::nothrow) DateTimeBloomFilterFunc(tracker);
+
+ case TYPE_DECIMAL:
+ return new (std::nothrow) DecimalFilterFunc(tracker);
+
+ case TYPE_DECIMALV2:
+ return new (std::nothrow) DecimalV2FilterFunc(tracker);
+
+ case TYPE_LARGEINT:
+ return new (std::nothrow) BloomFilterFunc<__int128>(tracker);
+
+ case TYPE_CHAR:
+ return new (std::nothrow) FixedCharBloomFilterFunc(tracker);
+ case TYPE_VARCHAR:
+ return new (std::nothrow) BloomFilterFunc<StringValue>(tracker);
+
+ default:
+ return nullptr;
+ }
+
+ return nullptr;
+}
+
+Status BloomFilterFuncBase::get_data(char** data, int* len) {
+ *data = _bloom_filter->data();
+ *len = _bloom_filter->size();
+ return Status::OK();
+}
+
+BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
+ : Predicate(node),
+ _is_prepare(false),
+ _always_true(false),
+ _filtered_rows(0),
+ _scan_rows(0) {}
+
+BloomFilterPredicate::~BloomFilterPredicate() {
+ LOG(INFO) << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows
+ << ",rate:" << (double)_filtered_rows / _scan_rows;
+}
+
+BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other)
+ : Predicate(other),
+ _is_prepare(other._is_prepare),
+ _always_true(other._always_true),
+ _filtered_rows(),
+ _scan_rows() {}
+
+Status BloomFilterPredicate::prepare(RuntimeState* state, BloomFilterFuncBase* filter) {
+ // DCHECK(filter != nullptr);
+ if (_is_prepare) {
+ return Status::OK();
+ }
+ _filter.reset(filter);
+ if (NULL == _filter.get()) {
+ return Status::InternalError("Unknown column type.");
+ }
+ _is_prepare = true;
+ return Status::OK();
+}
+
+std::string BloomFilterPredicate::debug_string() const {
+ std::stringstream out;
+ out << "BloomFilterPredicate()";
+ return out.str();
+}
+
+BooleanVal BloomFilterPredicate::get_boolean_val(ExprContext* ctx, TupleRow* row) {
+ if (_always_true) {
+ return BooleanVal(true);
+ }
+ const void* lhs_slot = ctx->get_value(_children[0], row);
+ if (lhs_slot == NULL) {
+ return BooleanVal::null();
+ }
+ _scan_rows++;
+ if (_filter->find(lhs_slot)) {
+ return BooleanVal(true);
+ }
+ _filtered_rows++;
+
+ if (!_has_calculate_filter && _scan_rows % _loop_size == 0) {
+ double rate = (double)_filtered_rows / _scan_rows;
+ if (rate < _expect_filter_rate) {
+ _always_true = true;
+ }
+ _has_calculate_filter = true;
+ }
+ return BooleanVal(false);
+}
+
+Status BloomFilterPredicate::open(RuntimeState* state, ExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ Expr::open(state, context, scope);
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
new file mode 100644
index 0000000..c727055
--- /dev/null
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
+#define DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
+#include <algorithm>
+#include <memory>
+#include <string>
+
+#include "common/object_pool.h"
+#include "exprs/expr_context.h"
+#include "exprs/predicate.h"
+#include "olap/rowset/segment_v2/bloom_filter.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/raw_value.h"
+
+namespace doris {
+/// only used in Runtime Filter
+class BloomFilterFuncBase {
+public:
+ BloomFilterFuncBase(MemTracker* tracker) : _tracker(tracker), _inited(false) {};
+
+ virtual ~BloomFilterFuncBase() {
+ if (_tracker != nullptr) {
+ _tracker->Release(_bloom_filter_alloced);
+ }
+ }
+
+ // init a bloom filter with expect element num
+ virtual Status init(int64_t expect_num = 4096, double fpp = 0.05) {
+ DCHECK(!_inited);
+ DCHECK(expect_num >= 0); // we need alloc 'optimal_bit_num(expect_num,fpp) / 8' bytes
+ _bloom_filter_alloced =
+ doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8;
+
+ std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+ Status st = doris::segment_v2::BloomFilter::create(
+ doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
+ // status is always true if we use valid BloomFilterAlgorithmPB
+ DCHECK(st.ok());
+ RETURN_IF_ERROR(st);
+ st = bloom_filter->init(_bloom_filter_alloced,
+ doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+ // status is always true if we use HASH_MURMUR3_X64_64
+ DCHECK(st.ok());
+ _bloom_filter.reset(bloom_filter.release());
+ _tracker->Consume(_bloom_filter_alloced);
+ _inited = true;
+ return st;
+ }
+
+ virtual Status init_with_fixed_length(int64_t bloom_filter_length) {
+ DCHECK(!_inited);
+ DCHECK(bloom_filter_length >= 0);
+
+ std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+ _bloom_filter_alloced = bloom_filter_length;
+ Status st = doris::segment_v2::BloomFilter::create(
+ doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
+ DCHECK(st.ok());
+ st = bloom_filter->init(_bloom_filter_alloced,
+ doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+ DCHECK(st.ok());
+ _tracker->Consume(_bloom_filter_alloced);
+ _bloom_filter.reset(bloom_filter.release());
+ _inited = true;
+ return st;
+ }
+
+ virtual void insert(const void* data) = 0;
+
+ virtual bool find(const void* data) = 0;
+
+ // Because the data structures of the execution layer and the storage layer are inconsistent,
+ // we need to provide additional interfaces for the storage layer to call
+ virtual bool find_olap_engine(const void* data) { return this->find(data); }
+
+ Status merge(BloomFilterFuncBase* bloomfilter_func) {
+ DCHECK(_inited);
+ if (_bloom_filter == nullptr) {
+ std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+ RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
+ doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
+ _bloom_filter.reset(bloom_filter.release());
+ }
+ if (_bloom_filter_alloced != bloomfilter_func->_bloom_filter_alloced) {
+ LOG(WARNING) << "bloom filter size not the same";
+ return Status::InvalidArgument("bloom filter size invalid");
+ }
+ return _bloom_filter->merge(bloomfilter_func->_bloom_filter.get());
+ }
+
+ Status assign(const char* data, int len) {
+ if (_bloom_filter == nullptr) {
+ std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+ RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
+ doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
+ _bloom_filter.reset(bloom_filter.release());
+ }
+ _bloom_filter_alloced = len - 1;
+ _tracker->Consume(_bloom_filter_alloced);
+ return _bloom_filter->init(data, len,
+ doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+ }
+ /// create a bloom filter function
+ /// tracker shouldn't be nullptr
+ static BloomFilterFuncBase* create_bloom_filter(MemTracker* tracker, PrimitiveType type);
+
+ Status get_data(char** data, int* len);
+
+ MemTracker* tracker() { return _tracker; }
+
+ void light_copy(BloomFilterFuncBase* other) {
+ _tracker = nullptr;
+ _bloom_filter_alloced = other->_bloom_filter_alloced;
+ _bloom_filter = other->_bloom_filter;
+ _inited = other->_inited;
+ }
+
+protected:
+ MemTracker* _tracker;
+ // bloom filter size
+ int32_t _bloom_filter_alloced;
+ std::shared_ptr<doris::segment_v2::BloomFilter> _bloom_filter;
+ bool _inited;
+};
+
+template <class T>
+class BloomFilterFunc : public BloomFilterFuncBase {
+public:
+ BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
+
+ ~BloomFilterFunc() = default;
+
+ virtual void insert(const void* data) {
+ DCHECK(_bloom_filter != nullptr);
+ _bloom_filter->add_bytes((char*)data, sizeof(T));
+ }
+
+ virtual bool find(const void* data) {
+ DCHECK(_bloom_filter != nullptr);
+ return _bloom_filter->test_bytes((char*)data, sizeof(T));
+ }
+};
+
+template <>
+class BloomFilterFunc<StringValue> : public BloomFilterFuncBase {
+public:
+ BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
+
+ ~BloomFilterFunc() = default;
+
+ virtual void insert(const void* data) {
+ DCHECK(_bloom_filter != nullptr);
+ const auto* value = reinterpret_cast<const StringValue*>(data);
+ _bloom_filter->add_bytes(value->ptr, value->len);
+ }
+
+ virtual bool find(const void* data) {
+ DCHECK(_bloom_filter != nullptr);
+ const auto* value = reinterpret_cast<const StringValue*>(data);
+ return _bloom_filter->test_bytes(value->ptr, value->len);
+ }
+};
+
+class FixedCharBloomFilterFunc : public BloomFilterFunc<StringValue> {
+public:
+ FixedCharBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<StringValue>(tracker) {}
+
+ ~FixedCharBloomFilterFunc() = default;
+
+ virtual bool find(const void* data) {
+ DCHECK(_bloom_filter != nullptr);
+ const auto* value = reinterpret_cast<const StringValue*>(data);
+ auto end_ptr = value->ptr + value->len - 1;
+ while (end_ptr > value->ptr && *end_ptr == '\0') --end_ptr;
+ return _bloom_filter->test_bytes(value->ptr, end_ptr - value->ptr + 1);
+ }
+};
+
+class DateTimeBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
+public:
+ DateTimeBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
+
+ virtual bool find_olap_engine(const void* data) {
+ DateTimeValue value;
+ value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
+ return _bloom_filter->test_bytes((char*)&value, sizeof(DateTimeValue));
+ }
+};
+
+class DateBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
+public:
+ DateBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
+
+ virtual bool find_olap_engine(const void* data) {
+ uint64_t value = 0;
+ value = *(unsigned char*)((char*)data + 2);
+ value <<= 8;
+ value |= *(unsigned char*)((char*)data + 1);
+ value <<= 8;
+ value |= *(unsigned char*)((char*)data);
+ DateTimeValue date_value;
+ date_value.from_olap_date(value);
+ date_value.to_datetime();
+ return _bloom_filter->test_bytes((char*)&date_value, sizeof(DateTimeValue));
+ }
+};
+
+class DecimalFilterFunc : public BloomFilterFunc<DecimalValue> {
+public:
+ DecimalFilterFunc(MemTracker* tracker) : BloomFilterFunc<DecimalValue>(tracker) {}
+
+ virtual bool find_olap_engine(const void* data) {
+ int64_t int_value = *(int64_t*)(data);
+ int32_t frac_value = *(int32_t*)((char*)data + sizeof(int64_t));
+ DecimalValue value(int_value, frac_value);
+ return _bloom_filter->test_bytes((char*)&value, sizeof(DecimalValue));
+ }
+};
+
+class DecimalV2FilterFunc : public BloomFilterFunc<DecimalV2Value> {
+public:
+ DecimalV2FilterFunc(MemTracker* tracker) : BloomFilterFunc<DecimalV2Value>(tracker) {}
+
+ virtual bool find_olap_engine(const void* data) {
+ DecimalV2Value value;
+ int64_t int_value = *(int64_t*)(data);
+ int32_t frac_value = *(int32_t*)((char*)data + sizeof(int64_t));
+ value.from_olap_decimal(int_value, frac_value);
+ return _bloom_filter->test_bytes((char*)&value, sizeof(DecimalV2Value));
+ }
+};
+
+// BloomFilterPredicate only used in runtime filter
+class BloomFilterPredicate : public Predicate {
+public:
+ virtual ~BloomFilterPredicate();
+ BloomFilterPredicate(const TExprNode& node);
+ BloomFilterPredicate(const BloomFilterPredicate& other);
+ virtual Expr* clone(ObjectPool* pool) const override {
+ return pool->add(new BloomFilterPredicate(*this));
+ }
+ Status prepare(RuntimeState* state, BloomFilterFuncBase* bloomfilterfunc);
+
+ std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
+
+ virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override;
+
+ virtual Status open(RuntimeState* state, ExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+
+protected:
+ friend class Expr;
+ virtual std::string debug_string() const override;
+
+private:
+ bool _is_prepare;
+ // if we set always = true, we will skip bloom filter
+ bool _always_true;
+ /// TODO: statistic filter rate in the profile
+ std::atomic<int64_t> _filtered_rows;
+ std::atomic<int64_t> _scan_rows;
+
+ std::shared_ptr<BloomFilterFuncBase> _filter;
+ bool _has_calculate_filter = false;
+ // loop size must be power of 2
+ constexpr static int64_t _loop_size = 8192;
+ // if filter rate less than this, bloom filter will set always true
+ constexpr static double _expect_filter_rate = 0.2;
+};
+} // namespace doris
+#endif
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 5f01195..280a3c6 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -21,8 +21,8 @@
#include <memory>
#include "common/status.h"
-#include "exprs/expr_value.h"
#include "exprs/expr.h"
+#include "exprs/expr_value.h"
#include "exprs/slot_ref.h"
#include "udf/udf.h"
#include "udf/udf_internal.h" // for ArrayVal
@@ -172,6 +172,8 @@ private:
friend class Expr;
friend class ScalarFnCall;
friend class InPredicate;
+ friend class RuntimePredicateWrapper;
+ friend class BloomFilterPredicate;
friend class OlapScanNode;
friend class EsScanNode;
friend class EsPredicate;
diff --git a/be/src/exprs/in_predicate.cpp b/be/src/exprs/in_predicate.cpp
index 1edadcc..9db94ce 100644
--- a/be/src/exprs/in_predicate.cpp
+++ b/be/src/exprs/in_predicate.cpp
@@ -19,8 +19,8 @@
#include <sstream>
-#include "exprs/expr_context.h"
#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.hpp"
@@ -36,12 +36,12 @@ InPredicate::InPredicate(const TExprNode& node)
InPredicate::~InPredicate() {}
-Status InPredicate::prepare(RuntimeState* state, const TypeDescriptor& type) {
+Status InPredicate::prepare(RuntimeState* state, HybridSetBase* hset) {
if (_is_prepare) {
return Status::OK();
}
- _hybrid_set.reset(HybridSetBase::create_set(type.type));
- if (NULL == _hybrid_set.get()) {
+ _hybrid_set.reset(hset);
+ if (NULL == _hybrid_set) {
return Status::InternalError("Unknown column type.");
}
_is_prepare = true;
diff --git a/be/src/exprs/in_predicate.h b/be/src/exprs/in_predicate.h
index d544911..b90c1b9 100644
--- a/be/src/exprs/in_predicate.h
+++ b/be/src/exprs/in_predicate.h
@@ -38,7 +38,7 @@ public:
return pool->add(new InPredicate(*this));
}
- Status prepare(RuntimeState* state, const TypeDescriptor&);
+ Status prepare(RuntimeState* state, HybridSetBase* hset);
Status open(RuntimeState* state, ExprContext* context,
FunctionContext::FunctionStateScope scope);
virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
@@ -57,6 +57,7 @@ public:
protected:
friend class Expr;
friend class HashJoinNode;
+ friend class RuntimePredicateWrapper;
InPredicate(const TExprNode& node);
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index e0e69f5..1b35336 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
#define DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
+#include "binary_predicate.h"
#include "common/object_pool.h"
#include "exprs/expr.h"
@@ -46,6 +47,7 @@ public:
protected:
friend class Expr;
+ friend Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data);
Literal(const TExprNode& node);
private:
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
new file mode 100644
index 0000000..b83f8fc
--- /dev/null
+++ b/be/src/exprs/runtime_filter.cpp
@@ -0,0 +1,1138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter.h"
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "exec/hash_join_node.h"
+#include "exprs/binary_predicate.h"
+#include "exprs/bloomfilter_predicate.h"
+#include "exprs/expr.h"
+#include "exprs/expr_context.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/in_predicate.h"
+#include "exprs/literal.h"
+#include "exprs/predicate.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gen_cpp/types.pb.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+#include "runtime/type_limit.h"
+#include "util/defer_op.h"
+#include "util/runtime_profile.h"
+#include "util/string_parser.hpp"
+
+namespace doris {
+// only used in Runtime Filter
+class MinMaxFuncBase {
+public:
+ virtual void insert(void* data) = 0;
+ virtual bool find(void* data) = 0;
+ virtual bool is_empty() = 0;
+ virtual void* get_max() = 0;
+ virtual void* get_min() = 0;
+ // assign minmax data
+ virtual Status assign(void* min_data, void* max_data) = 0;
+ // merge from other minmax_func
+ virtual Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) = 0;
+ // create min-max filter function
+ static MinMaxFuncBase* create_minmax_filter(PrimitiveType type);
+};
+
+template <class T>
+class MinMaxNumFunc : public MinMaxFuncBase {
+public:
+ MinMaxNumFunc() = default;
+ ~MinMaxNumFunc() = default;
+ virtual void insert(void* data) {
+ if (data == nullptr) return;
+ T val_data = *reinterpret_cast<T*>(data);
+ if (_empty) {
+ _min = val_data;
+ _max = val_data;
+ _empty = false;
+ return;
+ }
+ if (val_data < _min) {
+ _min = val_data;
+ } else if (val_data > _max) {
+ _max = val_data;
+ }
+ }
+
+ virtual bool find(void* data) {
+ if (data == nullptr) {
+ return false;
+ }
+ T val_data = *reinterpret_cast<T*>(data);
+ return val_data >= _min && val_data <= _max;
+ }
+
+ Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) {
+ if constexpr (std::is_same_v<T, StringValue>) {
+ MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+
+ if (other_minmax->_min < _min) {
+ auto& other_min = other_minmax->_min;
+ auto str = pool->add(new std::string(other_min.ptr, other_min.len));
+ _min.ptr = str->data();
+ _min.len = str->length();
+ }
+ if (other_minmax->_max > _max) {
+ auto& other_max = other_minmax->_max;
+ auto str = pool->add(new std::string(other_max.ptr, other_max.len));
+ _max.ptr = str->data();
+ _max.len = str->length();
+ }
+
+ } else {
+ MinMaxNumFunc<T>* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+ if (other_minmax->_min < _min) {
+ _min = other_minmax->_min;
+ }
+ if (other_minmax->_max > _max) {
+ _max = other_minmax->_max;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ virtual bool is_empty() { return _empty; }
+
+ virtual void* get_max() { return &_max; }
+
+ virtual void* get_min() { return &_min; }
+
+ virtual Status assign(void* min_data, void* max_data) {
+ _min = *(T*)min_data;
+ _max = *(T*)max_data;
+ return Status::OK();
+ }
+
+private:
+ T _max = type_limit<T>::min();
+ T _min = type_limit<T>::max();
+ // we use _empty to avoid compare twice
+ bool _empty = true;
+};
+
+MinMaxFuncBase* MinMaxFuncBase::create_minmax_filter(PrimitiveType type) {
+ switch (type) {
+ case TYPE_BOOLEAN:
+ return new (std::nothrow) MinMaxNumFunc<bool>();
+
+ case TYPE_TINYINT:
+ return new (std::nothrow) MinMaxNumFunc<int8_t>();
+
+ case TYPE_SMALLINT:
+ return new (std::nothrow) MinMaxNumFunc<int16_t>();
+
+ case TYPE_INT:
+ return new (std::nothrow) MinMaxNumFunc<int32_t>();
+
+ case TYPE_BIGINT:
+ return new (std::nothrow) MinMaxNumFunc<int64_t>();
+
+ case TYPE_FLOAT:
+ return new (std::nothrow) MinMaxNumFunc<float>();
+
+ case TYPE_DOUBLE:
+ return new (std::nothrow) MinMaxNumFunc<double>();
+
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ return new (std::nothrow) MinMaxNumFunc<DateTimeValue>();
+
+ case TYPE_DECIMAL:
+ return new (std::nothrow) MinMaxNumFunc<DecimalValue>();
+
+ case TYPE_DECIMALV2:
+ return new (std::nothrow) MinMaxNumFunc<DecimalV2Value>();
+
+ case TYPE_LARGEINT:
+ return new (std::nothrow) MinMaxNumFunc<__int128>();
+
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ return new (std::nothrow) MinMaxNumFunc<StringValue>();
+ default:
+ DCHECK(false) << "Invalid type.";
+ }
+ return NULL;
+}
+
+// PrimitiveType->TExprNodeType
+// TODO: use constexpr if we use c++14
+TExprNodeType::type get_expr_node_type(PrimitiveType type) {
+ switch (type) {
+ case TYPE_BOOLEAN:
+ return TExprNodeType::BOOL_LITERAL;
+
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT:
+ return TExprNodeType::INT_LITERAL;
+
+ case TYPE_LARGEINT:
+ return TExprNodeType::LARGE_INT_LITERAL;
+ break;
+
+ case TYPE_NULL:
+ return TExprNodeType::NULL_LITERAL;
+
+ case TYPE_FLOAT:
+ case TYPE_DOUBLE:
+ case TYPE_TIME:
+ return TExprNodeType::FLOAT_LITERAL;
+ break;
+
+ case TYPE_DECIMAL:
+ case TYPE_DECIMALV2:
+ return TExprNodeType::DECIMAL_LITERAL;
+
+ case TYPE_DATETIME:
+ return TExprNodeType::DATE_LITERAL;
+
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_HLL:
+ case TYPE_OBJECT:
+ return TExprNodeType::STRING_LITERAL;
+
+ default:
+ DCHECK(false) << "Invalid type.";
+ return TExprNodeType::NULL_LITERAL;
+ }
+}
+
+// PrimitiveType-> PColumnType
+// TODO: use constexpr if we use c++14
+PColumnType to_proto(PrimitiveType type) {
+ switch (type) {
+ case TYPE_BOOLEAN:
+ return PColumnType::COLUMN_TYPE_BOOL;
+ case TYPE_TINYINT:
+ return PColumnType::COLUMN_TYPE_TINY_INT;
+ case TYPE_SMALLINT:
+ return PColumnType::COLUMN_TYPE_SMALL_INT;
+ case TYPE_INT:
+ return PColumnType::COLUMN_TYPE_INT;
+ case TYPE_BIGINT:
+ return PColumnType::COLUMN_TYPE_BIGINT;
+ case TYPE_LARGEINT:
+ return PColumnType::COLUMN_TYPE_LARGEINT;
+ case TYPE_FLOAT:
+ return PColumnType::COLUMN_TYPE_FLOAT;
+ case TYPE_DOUBLE:
+ return PColumnType::COLUMN_TYPE_DOUBLE;
+ case TYPE_DATE:
+ return PColumnType::COLUMN_TYPE_DATE;
+ case TYPE_DATETIME:
+ return PColumnType::COLUMN_TYPE_DATETIME;
+ case TYPE_DECIMAL:
+ return PColumnType::COLUMN_TYPE_DECIMAL;
+ case TYPE_DECIMALV2:
+ return PColumnType::COLUMN_TYPE_DECIMALV2;
+ case TYPE_CHAR:
+ return PColumnType::COLUMN_TYPE_CHAR;
+ case TYPE_VARCHAR:
+ return PColumnType::COLUMN_TYPE_VARCHAR;
+ default:
+ DCHECK(false) << "Invalid type.";
+ }
+ DCHECK(false);
+ return PColumnType::COLUMN_TYPE_INT;
+}
+
+// PColumnType->PrimitiveType
+// TODO: use constexpr if we use c++14
+PrimitiveType to_primitive_type(PColumnType type) {
+ switch (type) {
+ case PColumnType::COLUMN_TYPE_BOOL:
+ return TYPE_BOOLEAN;
+ case PColumnType::COLUMN_TYPE_TINY_INT:
+ return TYPE_TINYINT;
+ case PColumnType::COLUMN_TYPE_SMALL_INT:
+ return TYPE_SMALLINT;
+ case PColumnType::COLUMN_TYPE_INT:
+ return TYPE_INT;
+ case PColumnType::COLUMN_TYPE_BIGINT:
+ return TYPE_BIGINT;
+ case PColumnType::COLUMN_TYPE_LARGEINT:
+ return TYPE_LARGEINT;
+ case PColumnType::COLUMN_TYPE_FLOAT:
+ return TYPE_FLOAT;
+ case PColumnType::COLUMN_TYPE_DOUBLE:
+ return TYPE_DOUBLE;
+ case PColumnType::COLUMN_TYPE_DATE:
+ return TYPE_DATE;
+ case PColumnType::COLUMN_TYPE_DATETIME:
+ return TYPE_DATETIME;
+ case PColumnType::COLUMN_TYPE_DECIMAL:
+ return TYPE_DECIMAL;
+ case PColumnType::COLUMN_TYPE_DECIMALV2:
+ return TYPE_DECIMALV2;
+ case PColumnType::COLUMN_TYPE_VARCHAR:
+ return TYPE_VARCHAR;
+ case PColumnType::COLUMN_TYPE_CHAR:
+ return TYPE_CHAR;
+ default:
+ DCHECK(false);
+ }
+ return TYPE_INT;
+}
+
+// PFilterType -> RuntimeFilterType
+RuntimeFilterType get_type(int filter_type) {
+ switch (filter_type) {
+ case PFilterType::BLOOM_FILTER: {
+ return RuntimeFilterType::BLOOM_FILTER;
+ }
+ case PFilterType::MINMAX_FILTER:
+ return RuntimeFilterType::MINMAX_FILTER;
+ default:
+ return RuntimeFilterType::UNKNOWN_FILTER;
+ }
+}
+
+// RuntimeFilterType -> PFilterType
+PFilterType get_type(RuntimeFilterType type) {
+ switch (type) {
+ case RuntimeFilterType::BLOOM_FILTER:
+ return PFilterType::BLOOM_FILTER;
+ case RuntimeFilterType::MINMAX_FILTER:
+ return PFilterType::MINMAX_FILTER;
+ default:
+ return PFilterType::UNKNOW_FILTER;
+ }
+}
+
+TTypeDesc create_type_desc(PrimitiveType type) {
+ TTypeDesc type_desc;
+ std::vector<TTypeNode> node_type;
+ node_type.emplace_back();
+ TScalarType scalarType;
+ scalarType.__set_type(to_thrift(type));
+ scalarType.__set_len(-1);
+ node_type.back().__set_scalar_type(scalarType);
+ type_desc.__set_types(node_type);
+ return type_desc;
+}
+
+// only used to push down to olap engine
+Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
+ TExprNode node;
+
+ switch (type) {
+ case TYPE_BOOLEAN: {
+ TBoolLiteral boolLiteral;
+ boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
+ node.__set_bool_literal(boolLiteral);
+ break;
+ }
+ case TYPE_TINYINT: {
+ TIntLiteral intLiteral;
+ intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
+ node.__set_int_literal(intLiteral);
+ break;
+ }
+ case TYPE_SMALLINT: {
+ TIntLiteral intLiteral;
+ intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
+ node.__set_int_literal(intLiteral);
+ break;
+ }
+ case TYPE_INT: {
+ TIntLiteral intLiteral;
+ intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
+ node.__set_int_literal(intLiteral);
+ break;
+ }
+ case TYPE_BIGINT: {
+ TIntLiteral intLiteral;
+ intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
+ node.__set_int_literal(intLiteral);
+ break;
+ }
+ case TYPE_LARGEINT: {
+ TLargeIntLiteral largeIntLiteral;
+ largeIntLiteral.__set_value(
+ LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
+ node.__set_large_int_literal(largeIntLiteral);
+ break;
+ }
+ case TYPE_FLOAT: {
+ TFloatLiteral floatLiteral;
+ floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
+ node.__set_float_literal(floatLiteral);
+ break;
+ }
+ case TYPE_DOUBLE: {
+ TFloatLiteral floatLiteral;
+ floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
+ node.__set_float_literal(floatLiteral);
+ break;
+ }
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ TDateLiteral dateLiteral;
+ char convert_buffer[30];
+ reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
+ dateLiteral.__set_value(convert_buffer);
+ node.__set_date_literal(dateLiteral);
+ break;
+ }
+ case TYPE_DECIMAL: {
+ TDecimalLiteral decimalLiteral;
+ decimalLiteral.__set_value(reinterpret_cast<const DecimalValue*>(data)->to_string());
+ node.__set_decimal_literal(decimalLiteral);
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ TDecimalLiteral decimalLiteral;
+ decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
+ node.__set_decimal_literal(decimalLiteral);
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR: {
+ const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
+ TStringLiteral tstringLiteral;
+ tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
+ node.__set_string_literal(tstringLiteral);
+ break;
+ }
+ default:
+ DCHECK(false);
+ return NULL;
+ }
+ node.__set_node_type(get_expr_node_type(type));
+ node.__set_type(create_type_desc(type));
+ return pool->add(new Literal(node));
+}
+
+BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
+ TExprOpcode::type opcode) {
+ TExprNode node;
+ TScalarType tscalar_type;
+ tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
+ TTypeNode ttype_node;
+ ttype_node.__set_type(TTypeNodeType::SCALAR);
+ ttype_node.__set_scalar_type(tscalar_type);
+ TTypeDesc t_type_desc;
+ t_type_desc.types.push_back(ttype_node);
+ node.__set_type(t_type_desc);
+ node.__set_opcode(opcode);
+ node.__set_child_type(to_thrift(prim_type));
+ node.__set_num_children(2);
+ node.__set_output_scale(-1);
+ node.__set_node_type(TExprNodeType::BINARY_PRED);
+ return (BinaryPredicate*)pool->add(BinaryPredicate::from_thrift(node));
+}
+// This class is a wrapper of runtime predicate function
+class RuntimePredicateWrapper {
+public:
+ RuntimePredicateWrapper(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+ const RuntimeFilterParams* params)
+ : _tracker(tracker),
+ _pool(pool),
+ _column_return_type(params->column_return_type),
+ _filter_type(params->filter_type) {}
+ // for a 'tmp' runtime predicate wrapper
+ // only could called assign method or as a param for merge
+ RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type)
+ : _tracker(tracker), _pool(pool), _filter_type(type) {}
+ // init runtimefilter wrapper
+ // alloc memory to init runtime filter function
+ Status init(const RuntimeFilterParams* params) {
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ _hybrid_set.reset(HybridSetBase::create_set(_column_return_type));
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _minmax_func.reset(MinMaxFuncBase::create_minmax_filter(_column_return_type));
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ _bloomfilter_func.reset(
+ BloomFilterFuncBase::create_bloom_filter(_tracker, _column_return_type));
+ return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
+ }
+ default:
+ DCHECK(false);
+ return Status::InvalidArgument("Unknown Filter type");
+ }
+ return Status::OK();
+ }
+
+ void insert(void* data) {
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ if (data != nullptr) {
+ _hybrid_set->insert(data);
+ }
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _minmax_func->insert(data);
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ DCHECK(_bloomfilter_func != nullptr);
+ _bloomfilter_func->insert(data);
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+ }
+
+ template <class T>
+ Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr) {
+ DCHECK(state != nullptr);
+ DCHECK(container != nullptr);
+ DCHECK(_pool != nullptr);
+ DCHECK(prob_expr->root()->type().type == _column_return_type);
+
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ TTypeDesc type_desc = create_type_desc(_column_return_type);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::IN_PRED);
+ node.in_predicate.__set_is_not_in(false);
+ node.__set_opcode(TExprOpcode::FILTER_IN);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ auto in_pred = _pool->add(new InPredicate(node));
+ RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
+ in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ ExprContext* ctx = _pool->add(new ExprContext(in_pred));
+ container->push_back(ctx);
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ // create max filter
+ auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
+ auto max_literal = create_literal(_pool, _column_return_type, _minmax_func->get_max());
+ max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ max_pred->add_child(max_literal);
+ container->push_back(_pool->add(new ExprContext(max_pred)));
+ // create min filter
+ auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
+ auto min_literal = create_literal(_pool, _column_return_type, _minmax_func->get_min());
+ min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ min_pred->add_child(min_literal);
+ container->push_back(_pool->add(new ExprContext(min_pred)));
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ // create a bloom filter
+ TTypeDesc type_desc = create_type_desc(_column_return_type);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::BLOOM_PRED);
+ node.__set_opcode(TExprOpcode::RT_FILTER);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
+ RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
+ bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
+ container->push_back(ctx);
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+ return Status::OK();
+ }
+
+ Status merge(const RuntimePredicateWrapper* wrapper) {
+ DCHECK(_filter_type == wrapper->_filter_type);
+ if (_filter_type != wrapper->_filter_type) {
+ return Status::InvalidArgument("invalid filter type");
+ }
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ DCHECK(false) << "in filter should't apply in shuffle join";
+ return Status::InternalError("in filter should't apply in shuffle join");
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _minmax_func->merge(wrapper->_minmax_func.get(), _pool);
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+ break;
+ }
+ default:
+ DCHECK(false);
+ return Status::InternalError("unknown runtime filter");
+ }
+ return Status::OK();
+ }
+
+ // used by shuffle runtime filter
+ // assign this filter by protobuf
+ Status assign(const PBloomFilter* bloom_filter, const char* data) {
+ DCHECK(_tracker != nullptr);
+ // we won't use this class to insert or find any data
+ // so any type is ok
+ _bloomfilter_func.reset(
+ BloomFilterFuncBase::create_bloom_filter(_tracker, PrimitiveType::TYPE_INT));
+ return _bloomfilter_func->assign(data, bloom_filter->filter_length());
+ }
+
+ // used by shuffle runtime filter
+ // assign this filter by protobuf
+ Status assign(const PMinMaxFilter* minmax_filter) {
+ DCHECK(_tracker != nullptr);
+ PrimitiveType type = to_primitive_type(minmax_filter->column_type());
+ _minmax_func.reset(MinMaxFuncBase::create_minmax_filter(type));
+ switch (type) {
+ case TYPE_BOOLEAN: {
+ bool min_val;
+ bool max_val;
+ min_val = minmax_filter->min_val().boolval();
+ max_val = minmax_filter->max_val().boolval();
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_TINYINT: {
+ int8_t min_val;
+ int8_t max_val;
+ min_val = static_cast<int8_t>(minmax_filter->min_val().intval());
+ max_val = static_cast<int8_t>(minmax_filter->max_val().intval());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_SMALLINT: {
+ int16_t min_val;
+ int16_t max_val;
+ min_val = static_cast<int16_t>(minmax_filter->min_val().intval());
+ max_val = static_cast<int16_t>(minmax_filter->max_val().intval());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_INT: {
+ int32_t min_val;
+ int32_t max_val;
+ min_val = minmax_filter->min_val().intval();
+ max_val = minmax_filter->max_val().intval();
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_BIGINT: {
+ int64_t min_val;
+ int64_t max_val;
+ min_val = minmax_filter->min_val().longval();
+ max_val = minmax_filter->max_val().longval();
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_LARGEINT: {
+ int128_t min_val;
+ int128_t max_val;
+ auto min_string_val = minmax_filter->min_val().stringval();
+ auto max_string_val = minmax_filter->max_val().stringval();
+ StringParser::ParseResult result;
+ min_val = StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+ min_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ max_val = StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+ max_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_FLOAT: {
+ float min_val;
+ float max_val;
+ min_val = static_cast<float>(minmax_filter->min_val().doubleval());
+ max_val = static_cast<float>(minmax_filter->max_val().doubleval());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DOUBLE: {
+ double min_val;
+ double max_val;
+ min_val = static_cast<double>(minmax_filter->min_val().doubleval());
+ max_val = static_cast<double>(minmax_filter->max_val().doubleval());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DATETIME:
+ case TYPE_DATE: {
+ auto& min_val_ref = minmax_filter->min_val().stringval();
+ auto& max_val_ref = minmax_filter->max_val().stringval();
+ DateTimeValue min_val;
+ DateTimeValue max_val;
+ min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
+ max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR: {
+ auto& min_val_ref = minmax_filter->min_val().stringval();
+ auto& max_val_ref = minmax_filter->max_val().stringval();
+ auto min_val_ptr = _pool->add(new std::string(min_val_ref));
+ auto max_val_ptr = _pool->add(new std::string(max_val_ref));
+ StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
+ StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
+ return _minmax_func->assign(&min_val, &max_val);
+ }
+ default:
+ DCHECK(false) << "unknown type";
+ break;
+ }
+ return Status::InvalidArgument("not support!");
+ }
+
+ Status get_bloom_filter_desc(char** data, int* filter_length) {
+ return _bloomfilter_func->get_data(data, filter_length);
+ }
+
+ Status get_minmax_filter_desc(void** min_data, void** max_data) {
+ *min_data = _minmax_func->get_min();
+ *max_data = _minmax_func->get_max();
+ return Status::OK();
+ }
+
+ PrimitiveType column_type() { return _column_return_type; }
+
+ void ready_for_publish() {
+ if (_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+ switch (_column_return_type) {
+ case TYPE_VARCHAR:
+ case TYPE_CHAR: {
+ StringValue* min_value = static_cast<StringValue*>(_minmax_func->get_min());
+ StringValue* max_value = static_cast<StringValue*>(_minmax_func->get_max());
+ auto min_val_ptr = _pool->add(new std::string(min_value->ptr));
+ auto max_val_ptr = _pool->add(new std::string(max_value->ptr));
+ StringValue min_val(const_cast<char*>(min_val_ptr->c_str()), min_val_ptr->length());
+ StringValue max_val(const_cast<char*>(max_val_ptr->c_str()), max_val_ptr->length());
+ _minmax_func->assign(&min_val, &max_val);
+ }
+ default:
+ break;
+ }
+ }
+ }
+
+private:
+ MemTracker* _tracker;
+ ObjectPool* _pool;
+ PrimitiveType _column_return_type; // column type
+ RuntimeFilterType _filter_type;
+ std::unique_ptr<MinMaxFuncBase> _minmax_func;
+ std::unique_ptr<HybridSetBase> _hybrid_set;
+ std::unique_ptr<BloomFilterFuncBase> _bloomfilter_func;
+};
+
+Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+ const TRuntimeFilterDesc* desc, const RuntimeFilterRole role,
+ int node_id, IRuntimeFilter** res) {
+ *res = pool->add(new IRuntimeFilter(state, tracker, pool));
+ (*res)->set_role(role);
+ return (*res)->init_with_desc(desc, node_id);
+}
+
+void IRuntimeFilter::insert(void* data) {
+ DCHECK(is_producer());
+ _wrapper->insert(data);
+}
+
+Status IRuntimeFilter::publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx) {
+ DCHECK(is_producer());
+ if (_has_local_target) {
+ IRuntimeFilter* consumer_filter = nullptr;
+ // TODO: log if err
+ Status status =
+ _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter);
+ DCHECK(status.ok());
+ // push down
+ std::swap(this->_wrapper, consumer_filter->_wrapper);
+ consumer_filter->signal();
+ return Status::OK();
+ } else {
+ TNetworkAddress addr;
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
+ return push_to_remote(_state, &addr);
+ }
+}
+
+void IRuntimeFilter::publish_finally() {
+ DCHECK(is_producer());
+ join_rpc();
+}
+
+Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs) {
+ DCHECK(is_consumer());
+ if (!_is_ignored) {
+ return _wrapper->get_push_context(push_expr_ctxs, _state, _probe_ctx);
+ }
+ return Status::OK();
+}
+
+Status IRuntimeFilter::get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs,
+ ExprContext* probe_ctx) {
+ DCHECK(is_producer());
+ return _wrapper->get_push_context(push_expr_ctxs, _state, probe_ctx);
+}
+
+Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
+ const RowDescriptor& desc,
+ const std::shared_ptr<MemTracker>& tracker) {
+ DCHECK(_is_ready);
+ DCHECK(is_consumer());
+ std::lock_guard<std::mutex> guard(_inner_mutex);
+
+ if (!_push_down_ctxs.empty()) {
+ push_expr_ctxs->insert(push_expr_ctxs->end(), _push_down_ctxs.begin(),
+ _push_down_ctxs.end());
+ return Status::OK();
+ }
+ // push expr
+ RETURN_IF_ERROR(_wrapper->get_push_context(&_push_down_ctxs, _state, _probe_ctx));
+ RETURN_IF_ERROR(Expr::prepare(_push_down_ctxs, _state, desc, tracker));
+ return Expr::open(_push_down_ctxs, _state);
+}
+
+bool IRuntimeFilter::await() {
+ DCHECK(is_consumer());
+ SCOPED_TIMER(_await_time_cost);
+ int64_t wait_times_ms = _state->runtime_filter_wait_time_ms();
+ if (!_is_ready) {
+ std::unique_lock<std::mutex> lock(_inner_mutex);
+ return _inner_cv.wait_for(lock, std::chrono::milliseconds(wait_times_ms),
+ [this] { return this->_is_ready; });
+ }
+ return true;
+}
+
+void IRuntimeFilter::signal() {
+ DCHECK(is_consumer());
+ _is_ready = true;
+ _inner_cv.notify_all();
+ _effect_timer.reset();
+}
+
+Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, int node_id) {
+ // if node_id == -1 , it shouldn't be a consumer
+ DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
+
+ if (desc->type == TRuntimeFilterType::BLOOM) {
+ _runtime_filter_type = RuntimeFilterType::BLOOM_FILTER;
+ } else if (desc->type == TRuntimeFilterType::MIN_MAX) {
+ _runtime_filter_type = RuntimeFilterType::MINMAX_FILTER;
+ } else if (desc->type == TRuntimeFilterType::IN) {
+ _runtime_filter_type = RuntimeFilterType::IN_FILTER;
+ } else {
+ return Status::InvalidArgument("unknown filter type");
+ }
+
+ _is_broadcast_join = desc->is_broadcast_join;
+ _has_local_target = desc->has_local_targets;
+ _has_remote_target = desc->has_remote_targets;
+ _expr_order = desc->expr_order;
+ _filter_id = desc->filter_id;
+
+ ExprContext* build_ctx = nullptr;
+ RETURN_IF_ERROR(Expr::create_expr_tree(_pool, desc->src_expr, &build_ctx));
+
+ RuntimeFilterParams params;
+ params.filter_type = _runtime_filter_type;
+ params.column_return_type = build_ctx->root()->type().type;
+ if (desc->__isset.bloom_filter_size_bytes) {
+ params.bloom_filter_size = desc->bloom_filter_size_bytes;
+ }
+
+ if (node_id >= 0) {
+ DCHECK(is_consumer());
+ const auto iter = desc->planId_to_target_expr.find(node_id);
+ if (iter == desc->planId_to_target_expr.end()) {
+ DCHECK(false) << "runtime filter not found node_id:" << node_id;
+ return Status::InternalError("not found a node id");
+ }
+ RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second, &_probe_ctx));
+ }
+
+ _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _mem_tracker, _pool, ¶ms));
+ return _wrapper->init(¶ms);
+}
+
+Status IRuntimeFilter::serialize(PMergeFilterRequest* request, void** data, int* len) {
+ return _serialize(request, data, len);
+}
+
+Status IRuntimeFilter::serialize(PPublishFilterRequest* request, void** data, int* len) {
+ return _serialize(request, data, len);
+}
+
+Status IRuntimeFilter::create_wrapper(const MergeRuntimeFilterParams* param, MemTracker* tracker,
+ ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+ return _create_wrapper(param, tracker, pool, wrapper);
+}
+
+Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParams* param, MemTracker* tracker,
+ ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+ return _create_wrapper(param, tracker, pool, wrapper);
+}
+
+template <class T>
+Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+ int filter_type = param->request->filter_type();
+ wrapper->reset(new RuntimePredicateWrapper(tracker, pool, get_type(filter_type)));
+
+ switch (filter_type) {
+ case PFilterType::BLOOM_FILTER: {
+ DCHECK(param->request->has_bloom_filter());
+ return (*wrapper)->assign(¶m->request->bloom_filter(), param->data);
+ }
+ case PFilterType::MINMAX_FILTER: {
+ DCHECK(param->request->has_minmax_filter());
+ return (*wrapper)->assign(¶m->request->minmax_filter());
+ }
+ default:
+ return Status::InvalidArgument("unknow filter type");
+ }
+}
+
+void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
+ DCHECK(parent_profile != nullptr);
+ _profile.reset(new RuntimeProfile("RuntimeFilter:" + ::doris::to_string(_runtime_filter_type)));
+ parent_profile->add_child(_profile.get(), true, nullptr);
+
+ _effect_time_cost = ADD_TIMER(_profile, "EffectTimeCost");
+ _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
+ _effect_timer.reset(new ScopedTimer<MonotonicStopWatch>(_effect_time_cost));
+ _effect_timer->start();
+}
+
+void IRuntimeFilter::set_push_down_profile() {
+ _profile->add_info_string("HasPushDownToEngine", "true");
+}
+
+void IRuntimeFilter::ready_for_publish() {
+ _wrapper->ready_for_publish();
+}
+
+Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
+ return _wrapper->merge(wrapper);
+}
+
+template <class T>
+Status IRuntimeFilter::_serialize(T* request, void** data, int* len) {
+ request->set_filter_type(get_type(_runtime_filter_type));
+
+ if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
+ RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len));
+ DCHECK(data != nullptr);
+ request->mutable_bloom_filter()->set_filter_length(*len);
+ request->mutable_bloom_filter()->set_always_true(false);
+ } else if (_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+ auto minmax_filter = request->mutable_minmax_filter();
+ to_protobuf(minmax_filter);
+ } else {
+ return Status::InvalidArgument("not implemented !");
+ }
+ return Status::OK();
+}
+
+void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
+ void* min_data = nullptr;
+ void* max_data = nullptr;
+ _wrapper->get_minmax_filter_desc(&min_data, &max_data);
+ DCHECK(min_data != nullptr);
+ DCHECK(max_data != nullptr);
+ filter->set_column_type(to_proto(_wrapper->column_type()));
+
+ switch (_wrapper->column_type()) {
+ case TYPE_BOOLEAN: {
+ filter->mutable_min_val()->set_boolval(*reinterpret_cast<const int32_t*>(min_data));
+ filter->mutable_max_val()->set_boolval(*reinterpret_cast<const int32_t*>(max_data));
+ return;
+ }
+ case TYPE_TINYINT: {
+ filter->mutable_min_val()->set_intval(*reinterpret_cast<const int8_t*>(min_data));
+ filter->mutable_max_val()->set_intval(*reinterpret_cast<const int8_t*>(max_data));
+ return;
+ }
+ case TYPE_SMALLINT: {
+ filter->mutable_min_val()->set_intval(*reinterpret_cast<const int16_t*>(min_data));
+ filter->mutable_max_val()->set_intval(*reinterpret_cast<const int16_t*>(max_data));
+ return;
+ }
+ case TYPE_INT: {
+ filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
+ filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
+ return;
+ }
+ case TYPE_BIGINT: {
+ filter->mutable_min_val()->set_longval(*reinterpret_cast<const int64_t*>(min_data));
+ filter->mutable_max_val()->set_longval(*reinterpret_cast<const int64_t*>(max_data));
+ return;
+ }
+ case TYPE_LARGEINT: {
+ filter->mutable_min_val()->set_stringval(
+ LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(min_data)));
+ filter->mutable_max_val()->set_stringval(
+ LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(max_data)));
+ return;
+ }
+ case TYPE_FLOAT: {
+ filter->mutable_min_val()->set_doubleval(*reinterpret_cast<const float*>(min_data));
+ filter->mutable_max_val()->set_doubleval(*reinterpret_cast<const float*>(max_data));
+ return;
+ }
+ case TYPE_DOUBLE: {
+ filter->mutable_min_val()->set_doubleval(*reinterpret_cast<const double*>(min_data));
+ filter->mutable_max_val()->set_doubleval(*reinterpret_cast<const double*>(max_data));
+ return;
+ }
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ char convert_buffer[30];
+ reinterpret_cast<const DateTimeValue*>(min_data)->to_string(convert_buffer);
+ filter->mutable_min_val()->set_stringval(convert_buffer);
+ reinterpret_cast<const DateTimeValue*>(max_data)->to_string(convert_buffer);
+ filter->mutable_max_val()->set_stringval(convert_buffer);
+ return;
+ }
+ case TYPE_DECIMAL: {
+ filter->mutable_min_val()->set_stringval(
+ reinterpret_cast<const DecimalValue*>(min_data)->to_string());
+ filter->mutable_max_val()->set_stringval(
+ reinterpret_cast<const DecimalValue*>(max_data)->to_string());
+ return;
+ }
+ case TYPE_DECIMALV2: {
+ filter->mutable_min_val()->set_stringval(
+ reinterpret_cast<const DecimalV2Value*>(min_data)->to_string());
+ filter->mutable_max_val()->set_stringval(
+ reinterpret_cast<const DecimalV2Value*>(max_data)->to_string());
+ return;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR: {
+ const StringValue* min_string_value = reinterpret_cast<const StringValue*>(min_data);
+ filter->mutable_min_val()->set_stringval(
+ std::string(min_string_value->ptr, min_string_value->len));
+ const StringValue* max_string_value = reinterpret_cast<const StringValue*>(max_data);
+ filter->mutable_max_val()->set_stringval(
+ std::string(max_string_value->ptr, max_string_value->len));
+ break;
+ }
+ default: {
+ DCHECK(false) << "unknown type";
+ break;
+ }
+ }
+}
+
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+ std::unique_ptr<RuntimePredicateWrapper> wrapper;
+ RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _mem_tracker, _pool, &wrapper));
+ RETURN_IF_ERROR(_wrapper->merge(wrapper.get()));
+ this->signal();
+ return Status::OK();
+}
+
+Status IRuntimeFilter::consumer_close() {
+ DCHECK(is_consumer());
+ Expr::close(_push_down_ctxs, _state);
+ return Status::OK();
+}
+
+RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
+RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
+
+Status RuntimeFilterSlots::init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
+ int64_t hash_table_size) {
+ DCHECK(_probe_expr_context.size() == _build_expr_context.size());
+
+ // runtime filter effect stragety
+ // 1. we will ignore IN filter when hash_table_size is too big
+ // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size
+ // is too small and IN filter has effect
+
+ std::map<int, bool> has_in_filter;
+
+ auto ignore_filter = [state](int filter_id) {
+ IRuntimeFilter* consumer_filter = nullptr;
+ state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter);
+ DCHECK(consumer_filter != nullptr);
+ consumer_filter->set_ignored();
+ consumer_filter->signal();
+ };
+
+ for (auto& filter_desc : _runtime_filter_descs) {
+ IRuntimeFilter* runtime_filter = nullptr;
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
+ &runtime_filter));
+ DCHECK(runtime_filter != nullptr);
+ DCHECK(runtime_filter->expr_order() >= 0);
+ DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
+
+ if (runtime_filter->type() == RuntimeFilterType::IN_FILTER &&
+ hash_table_size >= state->runtime_filter_max_in_num()) {
+ ignore_filter(filter_desc.filter_id);
+ continue;
+ }
+ if (has_in_filter[runtime_filter->expr_order()] && !runtime_filter->has_remote_target() &&
+ runtime_filter->type() != RuntimeFilterType::IN_FILTER &&
+ hash_table_size < state->runtime_filter_max_in_num()) {
+ ignore_filter(filter_desc.filter_id);
+ continue;
+ }
+ has_in_filter[runtime_filter->expr_order()] =
+ (runtime_filter->type() == RuntimeFilterType::IN_FILTER);
+ _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
+ }
+
+ return Status::OK();
+}
+
+void RuntimeFilterSlots::ready_for_publish() {
+ for (auto& pair : _runtime_filters) {
+ for (auto filter : pair.second) {
+ filter->ready_for_publish();
+ }
+ }
+}
+
+void RuntimeFilterSlots::publish(HashJoinNode* hash_join_node) {
+ for (int i = 0; i < _probe_expr_context.size(); ++i) {
+ auto iter = _runtime_filters.find(i);
+ if (iter != _runtime_filters.end()) {
+ for (auto filter : iter->second) {
+ filter->publish(hash_join_node, _probe_expr_context[i]);
+ }
+ }
+ }
+ for (auto& pair : _runtime_filters) {
+ for (auto filter : pair.second) {
+ filter->publish_finally();
+ }
+ }
+}
+
+} // namespace doris
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
new file mode 100644
index 0000000..61ef65d
--- /dev/null
+++ b/be/src/exprs/runtime_filter.h
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_QUERY_EXPRS_RUNTIME_PREDICATE_H
+#define DORIS_BE_SRC_QUERY_EXPRS_RUNTIME_PREDICATE_H
+
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+
+#include "exprs/expr_context.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/types.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
+
+namespace doris {
+class Predicate;
+class ObjectPool;
+class ExprContext;
+class RuntimeState;
+class RuntimePredicateWrapper;
+class MemTracker;
+class TupleRow;
+class PPublishFilterRequest;
+class PMergeFilterRequest;
+class TRuntimeFilterDesc;
+class RowDescriptor;
+class PMinMaxFilter;
+class HashJoinNode;
+class RuntimeProfile;
+
+enum class RuntimeFilterType {
+ UNKNOWN_FILTER = -1,
+ IN_FILTER = 0,
+ MINMAX_FILTER = 1,
+ BLOOM_FILTER = 2
+};
+
+inline std::string to_string(RuntimeFilterType type) {
+ switch (type) {
+ case RuntimeFilterType::IN_FILTER: {
+ return std::string("in");
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ return std::string("bloomfilter");
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ return std::string("minmax");
+ }
+ default:
+ return std::string("UNKNOWN");
+ }
+}
+
+enum class RuntimeFilterRole { PRODUCER = 0, CONSUMER = 1 };
+
+struct RuntimeFilterParams {
+ RuntimeFilterParams() : filter_type(RuntimeFilterType::UNKNOWN_FILTER), bloom_filter_size(-1) {}
+
+ RuntimeFilterType filter_type;
+ PrimitiveType column_return_type;
+ // used in bloom filter
+ int64_t bloom_filter_size;
+};
+
+struct UpdateRuntimeFilterParams {
+ const PPublishFilterRequest* request;
+ const char* data;
+};
+
+struct MergeRuntimeFilterParams {
+ const PMergeFilterRequest* request;
+ const char* data;
+};
+
+/// The runtimefilter is built in the join node.
+/// The main purpose is to reduce the scanning amount of the
+/// left table data according to the scanning results of the right table during the join process.
+/// The runtimefilter will build some filter conditions.
+/// that can be pushed down to node based on the results of the right table.
+class IRuntimeFilter {
+public:
+ IRuntimeFilter(RuntimeState* state, MemTracker* mem_tracker, ObjectPool* pool)
+ : _state(state),
+ _mem_tracker(mem_tracker),
+ _pool(pool),
+ _runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER),
+ _filter_id(-1),
+ _is_broadcast_join(true),
+ _has_remote_target(false),
+ _has_local_target(false),
+ _is_ready(false),
+ _role(RuntimeFilterRole::PRODUCER),
+ _expr_order(-1),
+ _always_true(false),
+ _probe_ctx(nullptr),
+ _is_ignored(false) {}
+
+ ~IRuntimeFilter() = default;
+
+ static Status create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
+ const TRuntimeFilterDesc* desc, const RuntimeFilterRole role, int node_id,
+ IRuntimeFilter** res);
+
+ // insert data to build filter
+ // only used for producer
+ void insert(void* data);
+
+ // publish filter
+ // push filter to remote node or push down it to scan_node
+ Status publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx);
+
+ void publish_finally();
+
+ RuntimeFilterType type() const { return _runtime_filter_type; }
+
+ // get push down expr context
+ // This function can only be called once
+ // _wrapper's function will be clear
+ // only consumer could call this
+ Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs);
+
+ // This function is used by UT and producer
+ Status get_push_expr_ctxs(std::list<ExprContext*>* push_expr_ctxs, ExprContext* probe_ctx);
+
+ // This function can be called multiple times
+ Status get_prepared_context(std::vector<ExprContext*>* push_expr_ctxs,
+ const RowDescriptor& desc,
+ const std::shared_ptr<MemTracker>& tracker);
+
+ bool is_broadcast_join() const { return _is_broadcast_join; }
+
+ bool has_remote_target() const { return _has_remote_target; }
+
+ bool is_ready() const { return _is_ready; }
+
+ bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
+ bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; }
+ void set_role(const RuntimeFilterRole role) { _role = role; }
+ int expr_order() { return _expr_order; }
+
+ // only used for consumer
+ // if filter is not ready for filter data scan_node
+ // will wait util it ready or timeout
+ // This function will wait at most config::runtime_filter_shuffle_wait_time_ms
+ // if return true , filter is ready to use
+ bool await();
+ // this function will be called if a runtime filter sent by rpc
+ // it will nodify all wait threads
+ void signal();
+
+ // init filter with desc
+ Status init_with_desc(const TRuntimeFilterDesc* desc, int node_id = -1);
+
+ // serialize _wrapper to protobuf
+ Status serialize(PMergeFilterRequest* request, void** data, int* len);
+ Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr);
+
+ Status merge_from(const RuntimePredicateWrapper* wrapper);
+
+ static Status create_wrapper(const MergeRuntimeFilterParams* param, MemTracker* tracker,
+ ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+ static Status create_wrapper(const UpdateRuntimeFilterParams* param, MemTracker* tracker,
+ ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+
+ Status update_filter(const UpdateRuntimeFilterParams* param);
+
+ void set_ignored() { _is_ignored = true; }
+
+ // consumer should call before released
+ Status consumer_close();
+
+ // async push runtimefilter to remote node
+ Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
+ Status join_rpc();
+
+ void init_profile(RuntimeProfile* parent_profile);
+
+ void set_push_down_profile();
+
+ void ready_for_publish();
+
+protected:
+ // serialize _wrapper to protobuf
+ void to_protobuf(PMinMaxFilter* filter);
+
+ template <class T>
+ Status _serialize(T* request, void** data, int* len);
+
+ template <class T>
+ static Status _create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>* wrapper);
+
+protected:
+ RuntimeState* _state;
+ MemTracker* _mem_tracker;
+ ObjectPool* _pool;
+ // _wrapper is a runtime filter function wrapper
+ // _wrapper should alloc from _pool
+ RuntimePredicateWrapper* _wrapper;
+ // runtime filter type
+ RuntimeFilterType _runtime_filter_type;
+ // runtime filter id
+ int _filter_id;
+ // Specific types BoardCast or Shuffle
+ bool _is_broadcast_join;
+ // will apply to remote node
+ bool _has_remote_target;
+ // will apply to local node
+ bool _has_local_target;
+ // filter is ready for consumer
+ bool _is_ready;
+ // role consumer or producer
+ RuntimeFilterRole _role;
+ // expr index
+ int _expr_order;
+ // used for await or signal
+ std::mutex _inner_mutex;
+ std::condition_variable _inner_cv;
+
+ // if set always_true = true
+ // this filter won't filter any data
+ bool _always_true;
+
+ // build expr_context
+ // ExprContext* _build_ctx;
+ // probe expr_context
+ // it only used in consumer to generate runtime_filter expr_context
+ // we don't have to prepare it or close it
+ ExprContext* _probe_ctx;
+
+ // Indicate whether runtime filter expr has been ignored
+ bool _is_ignored;
+
+ // some runtime filter will generate
+ // multiple contexts such as minmax filter
+ // these context is called prepared by this,
+ // consumer_close should be called before release
+ std::vector<ExprContext*> _push_down_ctxs;
+
+ struct rpc_context;
+ std::shared_ptr<rpc_context> _rpc_context;
+
+ // parent profile
+ // only effect on consumer
+ std::unique_ptr<RuntimeProfile> _profile;
+ // unix millis
+ RuntimeProfile::Counter* _await_time_cost = nullptr;
+ RuntimeProfile::Counter* _effect_time_cost = nullptr;
+ std::unique_ptr<ScopedTimer<MonotonicStopWatch>> _effect_timer;
+};
+
+// avoid expose RuntimePredicateWrapper
+class RuntimeFilterWrapperHolder {
+public:
+ using WrapperPtr = std::unique_ptr<RuntimePredicateWrapper>;
+ RuntimeFilterWrapperHolder();
+ ~RuntimeFilterWrapperHolder();
+ WrapperPtr* getHandle() { return &_wrapper; }
+
+private:
+ WrapperPtr _wrapper;
+};
+
+/// this class used in a hash join node
+/// Provide a unified interface for other classes
+class RuntimeFilterSlots {
+public:
+ RuntimeFilterSlots(const std::vector<ExprContext*>& prob_expr_ctxs,
+ const std::vector<ExprContext*>& build_expr_ctxs,
+ const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
+ : _probe_expr_context(prob_expr_ctxs),
+ _build_expr_context(build_expr_ctxs),
+ _runtime_filter_descs(runtime_filter_descs) {}
+
+ Status init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker,
+ int64_t hash_table_size);
+
+ void insert(TupleRow* row) {
+ for (int i = 0; i < _build_expr_context.size(); ++i) {
+ auto iter = _runtime_filters.find(i);
+ if (iter != _runtime_filters.end()) {
+ void* val = _build_expr_context[i]->get_value(row);
+ for (auto filter : iter->second) {
+ filter->insert(val);
+ }
+ }
+ }
+ }
+
+ // should call this method after insert
+ void ready_for_publish();
+ // publish runtime filter
+ void publish(HashJoinNode* hash_join_node);
+
+private:
+ const std::vector<ExprContext*>& _probe_expr_context;
+ const std::vector<ExprContext*>& _build_expr_context;
+ const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+ // prob_contition index -> [IRuntimeFilter]
+ std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
+};
+
+} // namespace doris
+
+#endif
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
new file mode 100644
index 0000000..87bb579
--- /dev/null
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/config.h"
+#include "common/status.h"
+#include "exprs/runtime_filter.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+
+// for rpc
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+
+namespace doris {
+
+struct IRuntimeFilter::rpc_context {
+ PMergeFilterRequest request;
+ PMergeFilterResponse response;
+ brpc::Controller cntl;
+ brpc::CallId cid;
+};
+
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
+ DCHECK(is_producer());
+ DCHECK(_rpc_context == nullptr);
+ PBackendService_Stub* stub = state->exec_env()->brpc_stub_cache()->get_stub(*addr);
+ _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
+ void* data = nullptr;
+ int len = 0;
+
+ auto pquery_id = _rpc_context->request.mutable_query_id();
+ pquery_id->set_hi(_state->query_id().hi);
+ pquery_id->set_lo(_state->query_id().lo);
+
+ auto pfragment_instance_id = _rpc_context->request.mutable_fragment_id();
+ pfragment_instance_id->set_hi(state->fragment_instance_id().hi);
+ pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
+
+ _rpc_context->request.set_filter_id(_filter_id);
+ _rpc_context->cntl.set_timeout_ms(1000);
+ _rpc_context->cid = _rpc_context->cntl.call_id();
+
+ Status serialize_status = serialize(&_rpc_context->request, &data, &len);
+ if (serialize_status.ok()) {
+ LOG(INFO) << "Producer:" << _rpc_context->request.ShortDebugString() << addr->hostname
+ << ":" << addr->port;
+ if (len > 0) {
+ DCHECK(data != nullptr);
+ _rpc_context->cntl.request_attachment().append(data, len);
+ }
+ if (config::runtime_filter_use_async_rpc) {
+ stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+ brpc::DoNothing());
+ } else {
+ stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+ nullptr);
+ _rpc_context.reset();
+ }
+
+ } else {
+ // we should reset context
+ _rpc_context.reset();
+ }
+ return serialize_status;
+}
+
+Status IRuntimeFilter::join_rpc() {
+ DCHECK(is_producer());
+ if (_rpc_context != nullptr) {
+ brpc::Join(_rpc_context->cid);
+ if (_rpc_context->cntl.Failed()) {
+ LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
+ }
+ }
+ return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index ffa4112..7da0983 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -48,6 +48,7 @@ add_library(Olap STATIC
generic_iterators.cpp
hll.cpp
in_list_predicate.cpp
+ bloom_filter_predicate.cpp
in_stream.cpp
key_coder.cpp
lru_cache.cpp
diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp
new file mode 100644
index 0000000..06e4947
--- /dev/null
+++ b/be/src/olap/bloom_filter_predicate.cpp
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/bloom_filter_predicate.h"
+
+#include "olap/field.h"
+#include "runtime/string_value.hpp"
+#include "runtime/vectorized_row_batch.h"
+
+namespace doris {
+
+BloomFilterColumnPredicate::BloomFilterColumnPredicate(
+ uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter)
+ : ColumnPredicate(column_id), _filter(filter) {}
+
+// blomm filter column predicate do not support in segment v1
+void BloomFilterColumnPredicate::evaluate(VectorizedRowBatch* batch) const {
+ uint16_t n = batch->size();
+ uint16_t* sel = batch->selected();
+ if (!batch->selected_in_use()) {
+ for (uint16_t i = 0; i != n; ++i) {
+ sel[i] = i;
+ }
+ }
+}
+
+void BloomFilterColumnPredicate::evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const {
+ uint16_t new_size = 0;
+ if (block->is_nullable()) {
+ for (uint16_t i = 0; i < *size; ++i) {
+ uint16_t idx = sel[i];
+ sel[new_size] = idx;
+ const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+ new_size += (!block->cell(idx).is_null() && _filter->find_olap_engine(cell_value));
+ }
+ } else {
+ for (uint16_t i = 0; i < *size; ++i) {
+ uint16_t idx = sel[i];
+ sel[new_size] = idx;
+ const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+ new_size += _filter->find_olap_engine(cell_value);
+ }
+ }
+ *size = new_size;
+}
+
+Status BloomFilterColumnPredicate::evaluate(const Schema& schema,
+ const std::vector<BitmapIndexIterator*>& iterators,
+ uint32_t num_rows, Roaring* result) const {
+ return Status::OK();
+}
+
+} //namespace doris
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
new file mode 100644
index 0000000..191bfae
--- /dev/null
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
+#define DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
+
+#include <stdint.h>
+
+#include <roaring/roaring.hh>
+
+#include "exprs/bloomfilter_predicate.h"
+#include "olap/column_predicate.h"
+
+namespace doris {
+
+class VectorizedRowBatch;
+
+// only use in runtime filter and segment v2
+class BloomFilterColumnPredicate : public ColumnPredicate {
+public:
+ BloomFilterColumnPredicate(uint32_t column_id,
+ const std::shared_ptr<BloomFilterFuncBase>& filter);
+ ~BloomFilterColumnPredicate() override = default;
+
+ void evaluate(VectorizedRowBatch* batch) const override;
+
+ void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override;
+
+ // Now BloomFilter not be a sub column predicate, so we not support OR and AND.
+ // It should be supported in the future.
+ void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,
+ bool* flags) const override {};
+ void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,
+ bool* flags) const override {};
+
+ Status evaluate(const Schema& schema, const vector<BitmapIndexIterator*>& iterators,
+ uint32_t num_rows, Roaring* roaring) const override;
+
+private:
+ std::shared_ptr<BloomFilterFuncBase> _filter;
+};
+
+} //namespace doris
+
+#endif //DORIS_BE_SRC_OLAP_BLOOM_FILTER_PREDICATE_H
diff --git a/be/src/olap/in_list_predicate.cpp b/be/src/olap/in_list_predicate.cpp
index f4d2d98..35b7b34 100644
--- a/be/src/olap/in_list_predicate.cpp
+++ b/be/src/olap/in_list_predicate.cpp
@@ -23,9 +23,9 @@
namespace doris {
-#define IN_LIST_PRED_CONSTRUCTOR(CLASS) \
- template <class type> \
- CLASS<type>::CLASS(uint32_t column_id, std::set<type>&& values, bool opposite) \
+#define IN_LIST_PRED_CONSTRUCTOR(CLASS) \
+ template <class type> \
+ CLASS<type>::CLASS(uint32_t column_id, std::unordered_set<type>&& values, bool opposite) \
: ColumnPredicate(column_id, opposite), _values(std::move(values)) {}
IN_LIST_PRED_CONSTRUCTOR(InListPredicate)
@@ -96,8 +96,8 @@ IN_LIST_PRED_EVALUATE(NotInListPredicate, ==)
const type* cell_value = \
reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
auto result = (!block->cell(idx).is_null() && _values.find(*cell_value) \
- OP _values.end()); \
- new_size += _opposite ? !result : result; \
+ OP _values.end()); \
+ new_size += _opposite ? !result : result; \
} \
} else { \
for (uint16_t i = 0; i < *size; ++i) { \
@@ -115,57 +115,59 @@ IN_LIST_PRED_EVALUATE(NotInListPredicate, ==)
IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(InListPredicate, !=)
IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(NotInListPredicate, ==)
-#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(CLASS, OP) \
- template <class type> \
- void CLASS<type>::evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const { \
- if (block->is_nullable()) { \
- for (uint16_t i = 0; i < size; ++i) { \
- if (flags[i]) continue; \
- uint16_t idx = sel[i]; \
- const type* cell_value = \
- reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
- auto result = (!block->cell(idx).is_null() && _values.find(*cell_value) \
- OP _values.end()); \
- flags[i] |= _opposite ? !result : result; \
- } \
- } else { \
- for (uint16_t i = 0; i < size; ++i) { \
- if (flags[i]) continue; \
- uint16_t idx = sel[i]; \
- const type* cell_value = \
- reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
- auto result = (_values.find(*cell_value) OP _values.end()); \
- flags[i] |= _opposite ? !result : result; \
- } \
- } \
+#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(CLASS, OP) \
+ template <class type> \
+ void CLASS<type>::evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) \
+ const { \
+ if (block->is_nullable()) { \
+ for (uint16_t i = 0; i < size; ++i) { \
+ if (flags[i]) continue; \
+ uint16_t idx = sel[i]; \
+ const type* cell_value = \
+ reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
+ auto result = (!block->cell(idx).is_null() && _values.find(*cell_value) \
+ OP _values.end()); \
+ flags[i] |= _opposite ? !result : result; \
+ } \
+ } else { \
+ for (uint16_t i = 0; i < size; ++i) { \
+ if (flags[i]) continue; \
+ uint16_t idx = sel[i]; \
+ const type* cell_value = \
+ reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
+ auto result = (_values.find(*cell_value) OP _values.end()); \
+ flags[i] |= _opposite ? !result : result; \
+ } \
+ } \
}
IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(InListPredicate, !=)
IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_OR(NotInListPredicate, ==)
-#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(CLASS, OP) \
- template <class type> \
- void CLASS<type>::evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const { \
- if (block->is_nullable()) { \
- for (uint16_t i = 0; i < size; ++i) { \
- if (!flags[i]) continue; \
- uint16_t idx = sel[i]; \
- const type* cell_value = \
- reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
- auto result = (!block->cell(idx).is_null() && _values.find(*cell_value) \
- OP _values.end()); \
- flags[i] &= _opposite ? !result : result; \
- } \
- } else { \
- for (uint16_t i = 0; i < size; ++i) { \
- if (!flags[i]) continue; \
- uint16_t idx = sel[i]; \
- const type* cell_value = \
- reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
- auto result = (_values.find(*cell_value) OP _values.end()); \
- flags[i] &= _opposite ? !result : result; \
- } \
- } \
+#define IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(CLASS, OP) \
+ template <class type> \
+ void CLASS<type>::evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) \
+ const { \
+ if (block->is_nullable()) { \
+ for (uint16_t i = 0; i < size; ++i) { \
+ if (!flags[i]) continue; \
+ uint16_t idx = sel[i]; \
+ const type* cell_value = \
+ reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
+ auto result = (!block->cell(idx).is_null() && _values.find(*cell_value) \
+ OP _values.end()); \
+ flags[i] &= _opposite ? !result : result; \
+ } \
+ } else { \
+ for (uint16_t i = 0; i < size; ++i) { \
+ if (!flags[i]) continue; \
+ uint16_t idx = sel[i]; \
+ const type* cell_value = \
+ reinterpret_cast<const type*>(block->cell(idx).cell_ptr()); \
+ auto result = (_values.find(*cell_value) OP _values.end()); \
+ flags[i] &= _opposite ? !result : result; \
+ } \
+ } \
}
IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(InListPredicate, !=)
@@ -208,18 +210,29 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(NotInListPredicate, ==)
IN_LIST_PRED_BITMAP_EVALUATE(InListPredicate, &=)
IN_LIST_PRED_BITMAP_EVALUATE(NotInListPredicate, -=)
-#define IN_LIST_PRED_CONSTRUCTOR_DECLARATION(CLASS) \
- template CLASS<int8_t>::CLASS(uint32_t column_id, std::set<int8_t>&& values, bool opposite); \
- template CLASS<int16_t>::CLASS(uint32_t column_id, std::set<int16_t>&& values, bool opposite); \
- template CLASS<int32_t>::CLASS(uint32_t column_id, std::set<int32_t>&& values, bool opposite); \
- template CLASS<int64_t>::CLASS(uint32_t column_id, std::set<int64_t>&& values, bool opposite); \
- template CLASS<int128_t>::CLASS(uint32_t column_id, std::set<int128_t>&& values, bool opposite); \
- template CLASS<float>::CLASS(uint32_t column_id, std::set<float>&& values, bool opposite); \
- template CLASS<double>::CLASS(uint32_t column_id, std::set<double>&& values, bool opposite); \
- template CLASS<decimal12_t>::CLASS(uint32_t column_id, std::set<decimal12_t>&& values, bool opposite); \
- template CLASS<StringValue>::CLASS(uint32_t column_id, std::set<StringValue>&& values, bool opposite); \
- template CLASS<uint24_t>::CLASS(uint32_t column_id, std::set<uint24_t>&& values, bool opposite); \
- template CLASS<uint64_t>::CLASS(uint32_t column_id, std::set<uint64_t>&& values, bool opposite);
+#define IN_LIST_PRED_CONSTRUCTOR_DECLARATION(CLASS) \
+ template CLASS<int8_t>::CLASS(uint32_t column_id, std::unordered_set<int8_t>&& values, \
+ bool opposite); \
+ template CLASS<int16_t>::CLASS(uint32_t column_id, std::unordered_set<int16_t>&& values, \
+ bool opposite); \
+ template CLASS<int32_t>::CLASS(uint32_t column_id, std::unordered_set<int32_t>&& values, \
+ bool opposite); \
+ template CLASS<int64_t>::CLASS(uint32_t column_id, std::unordered_set<int64_t>&& values, \
+ bool opposite); \
+ template CLASS<int128_t>::CLASS(uint32_t column_id, std::unordered_set<int128_t>&& values, \
+ bool opposite); \
+ template CLASS<float>::CLASS(uint32_t column_id, std::unordered_set<float>&& values, \
+ bool opposite); \
+ template CLASS<double>::CLASS(uint32_t column_id, std::unordered_set<double>&& values, \
+ bool opposite); \
+ template CLASS<decimal12_t>::CLASS(uint32_t column_id, \
+ std::unordered_set<decimal12_t>&& values, bool opposite); \
+ template CLASS<StringValue>::CLASS(uint32_t column_id, \
+ std::unordered_set<StringValue>&& values, bool opposite); \
+ template CLASS<uint24_t>::CLASS(uint32_t column_id, std::unordered_set<uint24_t>&& values, \
+ bool opposite); \
+ template CLASS<uint64_t>::CLASS(uint32_t column_id, std::unordered_set<uint64_t>&& values, \
+ bool opposite);
IN_LIST_PRED_CONSTRUCTOR_DECLARATION(InListPredicate)
IN_LIST_PRED_CONSTRUCTOR_DECLARATION(NotInListPredicate)
diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h
index 8f96a04..b5bc73e 100644
--- a/be/src/olap/in_list_predicate.h
+++ b/be/src/olap/in_list_predicate.h
@@ -22,28 +22,78 @@
#include <roaring/roaring.hh>
#include <set>
+#include <unordered_set>
+#include "decimal12.h"
#include "olap/column_predicate.h"
+#include "uint24.h"
+#include "util/murmur_hash3.h"
+
+namespace std {
+// for string value
+template <>
+struct hash<doris::StringValue> {
+ uint64_t operator()(const doris::StringValue& rhs) const { return hash_value(rhs); }
+};
+
+template <>
+struct equal_to<doris::StringValue> {
+ bool operator()(const doris::StringValue& lhs, const doris::StringValue& rhs) const {
+ return lhs == rhs;
+ }
+};
+// for decimal12_t
+template <>
+struct hash<doris::decimal12_t> {
+ int64_t operator()(const doris::decimal12_t& rhs) const {
+ return hash<int64_t>()(rhs.integer) ^ hash<int32_t>()(rhs.fraction);
+ }
+};
+
+template <>
+struct equal_to<doris::decimal12_t> {
+ bool operator()(const doris::decimal12_t& lhs, const doris::decimal12_t& rhs) const {
+ return lhs == rhs;
+ }
+};
+// for uint24_t
+template <>
+struct hash<doris::uint24_t> {
+ size_t operator()(const doris::uint24_t& rhs) const {
+ uint32_t val(rhs);
+ return hash<int>()(val);
+ }
+};
+
+template <>
+struct equal_to<doris::uint24_t> {
+ bool operator()(const doris::uint24_t& lhs, const doris::uint24_t& rhs) const {
+ return lhs == rhs;
+ }
+};
+} // namespace std
namespace doris {
class VectorizedRowBatch;
-#define IN_LIST_PRED_CLASS_DEFINE(CLASS) \
- template <class type> \
- class CLASS : public ColumnPredicate { \
- public: \
- CLASS(uint32_t column_id, std::set<type>&& values, bool is_opposite = false); \
- virtual void evaluate(VectorizedRowBatch* batch) const override; \
- void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override; \
- void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const override;\
- void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, bool* flags) const override;\
- virtual Status evaluate(const Schema& schema, \
- const std::vector<BitmapIndexIterator*>& iterators, \
- uint32_t num_rows, Roaring* bitmap) const override; \
- \
- private: \
- std::set<type> _values; \
+#define IN_LIST_PRED_CLASS_DEFINE(CLASS) \
+ template <class type> \
+ class CLASS : public ColumnPredicate { \
+ public: \
+ CLASS(uint32_t column_id, std::unordered_set<type>&& values, bool is_opposite = false); \
+ virtual void evaluate(VectorizedRowBatch* batch) const override; \
+ void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override; \
+ void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size, \
+ bool* flags) const override; \
+ void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size, \
+ bool* flags) const override; \
+ virtual Status evaluate(const Schema& schema, \
+ const std::vector<BitmapIndexIterator*>& iterators, \
+ uint32_t num_rows, Roaring* bitmap) const override; \
+ \
+ private: \
+ std::unordered_set<type> _values; \
};
IN_LIST_PRED_CLASS_DEFINE(InListPredicate)
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index ce88f23..23ce803 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -19,7 +19,9 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <sstream>
+#include <unordered_set>
+#include "olap/bloom_filter_predicate.h"
#include "olap/collect_iterator.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
@@ -27,8 +29,8 @@
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/column_data.h"
#include "olap/rowset/beta_rowset_reader.h"
+#include "olap/rowset/column_data.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/mem_pool.h"
@@ -94,8 +96,7 @@ std::string Reader::KeysParam::to_string() const {
return ss.str();
}
-Reader::Reader() : _collect_iter(new CollectIterator()) {
-}
+Reader::Reader() : _collect_iter(new CollectIterator()) {}
Reader::~Reader() {
close();
@@ -339,8 +340,8 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params,
if (_keys_param.range == "gt") {
if (end_key != nullptr && compare_row_key(*start_key, *end_key) >= 0) {
VLOG_NOTICE << "return EOF when range=" << _keys_param.range
- << ", start_key=" << start_key->to_string()
- << ", end_key=" << end_key->to_string();
+ << ", start_key=" << start_key->to_string()
+ << ", end_key=" << end_key->to_string();
eof = true;
break;
}
@@ -348,8 +349,8 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params,
} else if (_keys_param.range == "ge") {
if (end_key != nullptr && compare_row_key(*start_key, *end_key) > 0) {
VLOG_NOTICE << "return EOF when range=" << _keys_param.range
- << ", start_key=" << start_key->to_string()
- << ", end_key=" << end_key->to_string();
+ << ", start_key=" << start_key->to_string()
+ << ", end_key=" << end_key->to_string();
eof = true;
break;
}
@@ -610,6 +611,11 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
}
}
}
+
+ // Only key column bloom filter will push down to storage engine
+ for (const auto& filter : read_params.bloom_filters) {
+ _col_predicates.emplace_back(_parse_to_predicate(filter));
+ }
}
#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
@@ -710,6 +716,35 @@ COMPARISON_PREDICATE_CONDITION_VALUE(le, LessEqualPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(gt, GreaterPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(ge, GreaterEqualPredicate)
+ColumnPredicate* Reader::_parse_to_predicate(
+ const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
+ int32_t index = _tablet->field_index(bloom_filter.first);
+ if (index < 0) {
+ return nullptr;
+ }
+ const TabletColumn& column = _tablet->tablet_schema().column(index);
+ // Because FE regards CHAR as VARCHAR and Date as Datetime during query planning,
+ // but direct use of filter will result in incorrect results due to inconsistent data structures.
+ // We need to convert to the data structure corresponding to the storage engine.
+ std::shared_ptr<BloomFilterFuncBase> filter;
+ switch (column.type()) {
+ case OLAP_FIELD_TYPE_CHAR: {
+ filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
+ TYPE_CHAR));
+ filter->light_copy(bloom_filter.second.get());
+ return new BloomFilterColumnPredicate(index, filter);
+ }
+ case OLAP_FIELD_TYPE_DATE: {
+ filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
+ TYPE_DATE));
+ filter->light_copy(bloom_filter.second.get());
+ return new BloomFilterColumnPredicate(index, filter);
+ }
+ default:
+ return new BloomFilterColumnPredicate(index, bloom_filter.second);
+ }
+}
+
ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool opposite) const {
// TODO: not equal and not in predicate is not pushed down
int32_t index = _tablet->field_index(condition.column_name);
@@ -720,9 +755,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
const TabletColumn& column = _tablet->tablet_schema().column(index);
ColumnPredicate* predicate = nullptr;
- if ((condition.condition_op == "*=" || condition.condition_op == "!*=" || condition.condition_op == "=" || condition.condition_op == "!=") && condition.condition_values.size() == 1) {
- predicate = condition.condition_op == "*=" || condition.condition_op == "=" ? _new_eq_pred(column, index, condition.condition_values[0], opposite) :
- _new_ne_pred(column, index, condition.condition_values[0], opposite);
+ if ((condition.condition_op == "*=" || condition.condition_op == "!*=" ||
+ condition.condition_op == "=" || condition.condition_op == "!=") &&
+ condition.condition_values.size() == 1) {
+ predicate = condition.condition_op == "*=" || condition.condition_op == "="
+ ? _new_eq_pred(column, index, condition.condition_values[0], opposite)
+ : _new_ne_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == "<<") {
predicate = _new_lt_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == "<=") {
@@ -731,10 +769,11 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
predicate = _new_gt_pred(column, index, condition.condition_values[0], opposite);
} else if (condition.condition_op == ">=") {
predicate = _new_ge_pred(column, index, condition.condition_values[0], opposite);
- } else if ((condition.condition_op == "*=" || condition.condition_op == "!*=") && condition.condition_values.size() > 1) {
+ } else if ((condition.condition_op == "*=" || condition.condition_op == "!*=") &&
+ condition.condition_values.size() > 1) {
switch (column.type()) {
case OLAP_FIELD_TYPE_TINYINT: {
- std::set<int8_t> values;
+ std::unordered_set<int8_t> values;
for (auto& cond_val : condition.condition_values) {
int32_t value = 0;
std::stringstream ss(cond_val);
@@ -744,12 +783,12 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
if (condition.condition_op == "*=") {
predicate = new InListPredicate<int8_t>(index, std::move(values), opposite);
} else {
- predicate = new NotInListPredicate<int8_t>(index, std::move(values),opposite);
+ predicate = new NotInListPredicate<int8_t>(index, std::move(values), opposite);
}
break;
}
case OLAP_FIELD_TYPE_SMALLINT: {
- std::set<int16_t> values;
+ std::unordered_set<int16_t> values;
for (auto& cond_val : condition.condition_values) {
int16_t value = 0;
std::stringstream ss(cond_val);
@@ -764,7 +803,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_INT: {
- std::set<int32_t> values;
+ std::unordered_set<int32_t> values;
for (auto& cond_val : condition.condition_values) {
int32_t value = 0;
std::stringstream ss(cond_val);
@@ -779,7 +818,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_BIGINT: {
- std::set<int64_t> values;
+ std::unordered_set<int64_t> values;
for (auto& cond_val : condition.condition_values) {
int64_t value = 0;
std::stringstream ss(cond_val);
@@ -794,7 +833,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_LARGEINT: {
- std::set<int128_t> values;
+ std::unordered_set<int128_t> values;
for (auto& cond_val : condition.condition_values) {
int128_t value = 0;
std::stringstream ss(cond_val);
@@ -809,7 +848,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_DECIMAL: {
- std::set<decimal12_t> values;
+ std::unordered_set<decimal12_t> values;
for (auto& cond_val : condition.condition_values) {
decimal12_t value = {0, 0};
value.from_string(cond_val);
@@ -823,7 +862,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_CHAR: {
- std::set<StringValue> values;
+ std::unordered_set<StringValue> values;
for (auto& cond_val : condition.condition_values) {
StringValue value;
size_t length = std::max(static_cast<size_t>(column.length()), cond_val.length());
@@ -842,7 +881,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_VARCHAR: {
- std::set<StringValue> values;
+ std::unordered_set<StringValue> values;
for (auto& cond_val : condition.condition_values) {
StringValue value;
int32_t length = cond_val.length();
@@ -860,7 +899,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_DATE: {
- std::set<uint24_t> values;
+ std::unordered_set<uint24_t> values;
for (auto& cond_val : condition.condition_values) {
uint24_t value = timestamp_from_date(cond_val);
values.insert(value);
@@ -873,7 +912,7 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
break;
}
case OLAP_FIELD_TYPE_DATETIME: {
- std::set<uint64_t> values;
+ std::unordered_set<uint64_t> values;
for (auto& cond_val : condition.condition_values) {
uint64_t value = timestamp_from_datetime(cond_val);
values.insert(value);
@@ -887,10 +926,11 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
}
// OLAP_FIELD_TYPE_BOOL is not valid in this case.
default:
- break;
+ break;
}
} else if (boost::to_lower_copy(condition.condition_op) == "is") {
- predicate = new NullPredicate(index, boost::to_lower_copy(condition.condition_values[0]) == "null", opposite);
+ predicate = new NullPredicate(
+ index, boost::to_lower_copy(condition.condition_values[0]) == "null", opposite);
}
return predicate;
}
@@ -953,8 +993,8 @@ OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) {
}
_tablet->obtain_header_rdlock();
- OLAPStatus ret = _delete_handler.init(
- _tablet->tablet_schema(), _tablet->delete_predicates(), read_params.version.second, this);
+ OLAPStatus ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
+ read_params.version.second, this);
_tablet->release_header_lock();
if (read_params.reader_type == READER_BASE_COMPACTION) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 766f8ec..72d5588 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -30,6 +30,7 @@
#include <utility>
#include <vector>
+#include "exprs/bloomfilter_predicate.h"
#include "olap/column_predicate.h"
#include "olap/delete_handler.h"
#include "olap/olap_cond.h"
@@ -67,7 +68,10 @@ struct ReaderParams {
std::string end_range;
std::vector<OlapTuple> start_key;
std::vector<OlapTuple> end_key;
+
std::vector<TCondition> conditions;
+ std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters;
+
// The ColumnData will be set when using Merger, eg Cumulative, BE.
std::vector<RowsetReaderSharedPtr> rs_readers;
std::vector<uint32_t> return_columns;
@@ -101,7 +105,8 @@ public:
uint64_t merged_rows() const { return _merged_rows; }
uint64_t filtered_rows() const {
- return _stats.rows_del_filtered + _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered;
+ return _stats.rows_del_filtered + _stats.rows_conditions_filtered +
+ _stats.rows_vec_del_cond_filtered;
}
const OlapReaderStatistics& stats() const { return _stats; }
@@ -124,7 +129,8 @@ private:
OLAPStatus _init_params(const ReaderParams& read_params);
- OLAPStatus _capture_rs_readers(const ReaderParams& read_params, std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
+ OLAPStatus _capture_rs_readers(const ReaderParams& read_params,
+ std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers);
@@ -132,15 +138,24 @@ private:
void _init_conditions_param(const ReaderParams& read_params);
- ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
- ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
- ColumnPredicate* _new_lt_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
- ColumnPredicate* _new_le_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
- ColumnPredicate* _new_gt_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
- ColumnPredicate* _new_ge_pred(const TabletColumn& column, int index, const std::string& cond, bool opposite) const;
+ ColumnPredicate* _new_eq_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
+ ColumnPredicate* _new_ne_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
+ ColumnPredicate* _new_lt_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
+ ColumnPredicate* _new_le_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
+ ColumnPredicate* _new_gt_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
+ ColumnPredicate* _new_ge_pred(const TabletColumn& column, int index, const std::string& cond,
+ bool opposite) const;
ColumnPredicate* _parse_to_predicate(const TCondition& condition, bool opposite = false) const;
+ ColumnPredicate* _parse_to_predicate(
+ const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
+
OLAPStatus _init_delete_condition(const ReaderParams& read_params);
OLAPStatus _init_return_columns(const ReaderParams& read_params);
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.cpp b/be/src/olap/rowset/segment_v2/bloom_filter.cpp
index f12e3e3..d7396b4 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter.cpp
@@ -37,7 +37,7 @@ Status BloomFilter::create(BloomFilterAlgorithmPB algorithm, std::unique_ptr<Blo
return Status::OK();
}
-uint32_t BloomFilter::_optimal_bit_num(uint64_t n, double fpp) {
+uint32_t BloomFilter::optimal_bit_num(uint64_t n, double fpp) {
// ref parquet bloom_filter branch(BlockSplitBloomFilter.java)
uint32_t num_bits = -8 * (double)n / log(1 - pow(fpp, 1.0 / 8));
uint32_t max_bits = MAXIMUM_BYTES << 3;
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.h b/be/src/olap/rowset/segment_v2/bloom_filter.h
index 4da15f1..33726aa 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter.h
+++ b/be/src/olap/rowset/segment_v2/bloom_filter.h
@@ -59,13 +59,16 @@ public:
// for write
Status init(uint64_t n, double fpp, HashStrategyPB strategy) {
+ return this->init(optimal_bit_num(n, fpp) / 8, strategy);
+ }
+
+ Status init(uint64_t filter_size, HashStrategyPB strategy) {
if (strategy == HASH_MURMUR3_X64_64) {
_hash_func = murmur_hash3_x64_64;
} else {
return Status::InvalidArgument(strings::Substitute("invalid strategy:$0", strategy));
}
- _num_bytes = _optimal_bit_num(n, fpp) / 8;
- // make sure _num_bytes is power of 2
+ _num_bytes = filter_size;
DCHECK((_num_bytes & (_num_bytes - 1)) == 0);
_size = _num_bytes + 1;
// reserve last byte for null flag
@@ -78,7 +81,7 @@ public:
// for read
// use deep copy to acquire the data
- Status init(char* buf, uint32_t size, HashStrategyPB strategy) {
+ Status init(const char* buf, uint32_t size, HashStrategyPB strategy) {
DCHECK(size > 1);
if (strategy == HASH_MURMUR3_X64_64) {
_hash_func = murmur_hash3_x64_64;
@@ -92,6 +95,7 @@ public:
memcpy(_data, buf, size);
_size = size;
_num_bytes = _size - 1;
+ DCHECK((_num_bytes & (_num_bytes - 1)) == 0);
_has_null = (bool*)(_data + _num_bytes);
return Status::OK();
}
@@ -134,13 +138,20 @@ public:
virtual void add_hash(uint64_t hash) = 0;
virtual bool test_hash(uint64_t hash) const = 0;
-private:
+ Status merge(const BloomFilter* other) {
+ DCHECK(other->size() == _size);
+ for (uint32_t i = 0; i < other->size(); i++) {
+ _data[i] |= other->_data[i];
+ }
+ return Status::OK();
+ }
+
// Compute the optimal bit number according to the following rule:
// m = -n * ln(fpp) / (ln(2) ^ 2)
// n: expected distinct record number
// fpp: false positive probability
// the result will be power of 2
- uint32_t _optimal_bit_num(uint64_t n, double fpp);
+ static uint32_t optimal_bit_num(uint64_t n, double fpp);
protected:
// bloom filter data
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 47bea84..8f7bd9e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -44,6 +44,7 @@ set(RUNTIME_FILES
result_writer.cpp
row_batch.cpp
runtime_state.cpp
+ runtime_filter_mgr.cpp
string_value.cpp
thread_resource_mgr.cpp
# timestamp_value.cpp
diff --git a/be/src/runtime/decimal_value.h b/be/src/runtime/decimal_value.h
index 54bbb51..1eb6f3a 100644
--- a/be/src/runtime/decimal_value.h
+++ b/be/src/runtime/decimal_value.h
@@ -23,6 +23,7 @@
#include <cstdlib>
#include <cstring>
#include <iostream>
+#include <limits>
#include <sstream>
#include <string>
#include <string_view>
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index da676aa..a7832f8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -39,6 +39,7 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_filter_mgr.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
@@ -95,10 +96,17 @@ public:
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
+ TUniqueId query_id() const { return _query_id; }
+
PlanFragmentExecutor* executor() { return &_executor; }
const DateTimeValue& start_time() const { return _start_time; }
+ void set_merge_controller_handler(
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
+ _merge_controller_handler = handler;
+ }
+
// Update status of this fragment execute
Status update_status(Status status) {
std::lock_guard<std::mutex> l(_status_lock);
@@ -156,6 +164,8 @@ private:
// This context is shared by all fragments of this host in a query
std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
+
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
};
FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -520,6 +530,10 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
params.backend_num, _exec_env, fragments_ctx));
}
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+ _runtimefilter_controller.add_entity(params, &handler);
+ exec_state->set_merge_controller_handler(handler);
+
RETURN_IF_ERROR(exec_state->prepare(params));
{
std::lock_guard<std::mutex> lock(_lock);
@@ -718,4 +732,34 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
return exec_plan_fragment(exec_fragment_params);
}
+Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
+ UniqueId fragment_instance_id = request->fragment_id();
+ TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+ std::shared_ptr<FragmentExecState> fragment_state;
+
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto iter = _fragment_map.find(tfragment_instance_id);
+ if (iter == _fragment_map.end()) {
+ LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
+ return Status::InvalidArgument("fragment-id");
+ }
+ fragment_state = iter->second;
+ }
+ DCHECK(fragment_state != nullptr);
+ RuntimeFilterMgr* runtime_filter_mgr =
+ fragment_state->executor()->runtime_state()->runtime_filter_mgr();
+
+ Status st = runtime_filter_mgr->update_filter(request, data);
+ return st;
+}
+
+Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
+ UniqueId queryid = request->query_id();
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+ RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
+ RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 7d16074..a6329cb 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -31,6 +31,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "gutil/ref_counted.h"
#include "http/rest_monitor_iface.h"
+#include "runtime_filter_mgr.h"
#include "util/countdown_latch.h"
#include "util/hash_util.hpp"
#include "util/metrics.h"
@@ -46,6 +47,7 @@ class ThreadPool;
class TExecPlanFragmentParams;
class TExecPlanFragmentParamsList;
class TUniqueId;
+class RuntimeFilterMergeController;
std::string to_load_error_http_path(const std::string& file_name);
@@ -80,6 +82,12 @@ public:
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns);
+ RuntimeFilterMergeController& runtimefilter_controller() { return _runtimefilter_controller; }
+
+ Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
+
+ Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
+
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb);
@@ -100,6 +108,8 @@ private:
std::shared_ptr<MetricEntity> _entity = nullptr;
UIntGauge* timeout_canceled_fragment_count = nullptr;
+
+ RuntimeFilterMergeController _runtimefilter_controller;
};
} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
new file mode 100644
index 0000000..58cec23
--- /dev/null
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -0,0 +1,304 @@
+#include "runtime/runtime_filter_mgr.h"
+
+#include <string>
+
+#include "client_cache.h"
+#include "exprs/runtime_filter.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+#include "service/brpc.h"
+#include "util/brpc_stub_cache.h"
+#include "util/time.h"
+
+namespace doris {
+
+template <class RPCRequest, class RPCResponse>
+struct async_rpc_context {
+ RPCRequest request;
+ RPCResponse response;
+ brpc::Controller cntl;
+ brpc::CallId cid;
+};
+
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state) : _state(state) {}
+
+RuntimeFilterMgr::~RuntimeFilterMgr() {}
+
+Status RuntimeFilterMgr::init() {
+ DCHECK(_state->instance_mem_tracker().get() != nullptr);
+ _tracker = _state->instance_mem_tracker().get();
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
+ IRuntimeFilter** target) {
+ int32_t key = filter_id;
+ std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
+
+ if (role == RuntimeFilterRole::CONSUMER) {
+ filter_map = &_consumer_map;
+ } else {
+ filter_map = &_producer_map;
+ }
+
+ auto iter = filter_map->find(key);
+ if (iter == filter_map->end()) {
+ LOG(WARNING) << "unknown filter...:" << key << ",role:" << (int)role;
+ return Status::InvalidArgument("unknown filter");
+ }
+ *target = iter->second.filter;
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter) {
+ return get_filter_by_role(filter_id, RuntimeFilterRole::CONSUMER, consumer_filter);
+}
+
+Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
+ IRuntimeFilter** producer_filter) {
+ return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER, producer_filter);
+}
+
+Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
+ int node_id) {
+ DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
+ role != RuntimeFilterRole::CONSUMER);
+ int32_t key = desc.filter_id;
+
+ std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
+ if (role == RuntimeFilterRole::CONSUMER) {
+ filter_map = &_consumer_map;
+ } else {
+ filter_map = &_producer_map;
+ }
+ // LOG(INFO) << "regist filter...:" << key << ",role:" << role;
+
+ auto iter = filter_map->find(key);
+ if (iter != filter_map->end()) {
+ return Status::InvalidArgument("filter has registed");
+ }
+
+ RuntimeFilterMgrVal filter_mgr_val;
+ filter_mgr_val.role = role;
+
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, _tracker, &_pool, &desc, role, node_id,
+ &filter_mgr_val.filter));
+
+ filter_map->emplace(key, filter_mgr_val);
+
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+ UpdateRuntimeFilterParams params;
+ params.request = request;
+ params.data = data;
+ int filter_id = request->filter_id();
+ IRuntimeFilter* real_filter = nullptr;
+ RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
+ return real_filter->update_filter(¶ms);
+}
+
+void RuntimeFilterMgr::set_runtime_filter_params(
+ const TRuntimeFilterParams& runtime_filter_params) {
+ this->_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
+ this->_has_merge_addr = true;
+}
+
+Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) {
+ DCHECK(_has_merge_addr);
+ if (_has_merge_addr) {
+ *addr = this->_merge_addr;
+ return Status::OK();
+ }
+ return Status::InternalError("not found merge addr");
+}
+
+Status RuntimeFilterMergeControllerEntity::_init_with_desc(
+ const TRuntimeFilterDesc* runtime_filter_desc,
+ const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+ const int producer_size) {
+ std::lock_guard<std::mutex> guard(_filter_map_mutex);
+ std::shared_ptr<RuntimeFilterCntlVal> cntVal = std::make_shared<RuntimeFilterCntlVal>();
+ // runtime_filter_desc and target will be released,
+ // so we need to copy to cntVal
+ // TODO: tracker should add a name
+ cntVal->producer_size = producer_size;
+ cntVal->runtime_filter_desc = *runtime_filter_desc;
+ cntVal->target_info = *target_info;
+ cntVal->pool.reset(new ObjectPool());
+ cntVal->tracker = MemTracker::CreateTracker();
+ cntVal->filter = cntVal->pool->add(
+ new IRuntimeFilter(nullptr, cntVal->tracker.get(), cntVal->pool.get()));
+
+ std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+ // LOG(INFO) << "entity filter id:" << filter_id;
+ cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc);
+ _filter_map.emplace(filter_id, cntVal);
+ return Status::OK();
+}
+
+Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
+ const TRuntimeFilterParams& runtime_filter_params) {
+ _query_id = query_id;
+ for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
+ int filter_id = filterid_to_desc.first;
+ const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
+ if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+ return Status::InternalError("runtime filter params meet error");
+ }
+ const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) {
+ return Status::InternalError("runtime filter params meet error");
+ }
+ _init_with_desc(&filterid_to_desc.second, &target_iter->second, build_iter->second);
+ }
+ return Status::OK();
+}
+
+// merge data
+Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
+ const char* data) {
+ std::shared_ptr<RuntimeFilterCntlVal> cntVal;
+ int merged_size = 0;
+ {
+ std::lock_guard<std::mutex> guard(_filter_map_mutex);
+ auto iter = _filter_map.find(std::to_string(request->filter_id()));
+ VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
+ if (iter == _filter_map.end()) {
+ LOG(WARNING) << "unknown filter id:" << std::to_string(request->filter_id());
+ return Status::InvalidArgument("unknown filter id");
+ }
+ cntVal = iter->second;
+ MergeRuntimeFilterParams params;
+ params.data = data;
+ params.request = request;
+ std::shared_ptr<MemTracker> tracker = iter->second->tracker;
+ ObjectPool* pool = iter->second->pool.get();
+ RuntimeFilterWrapperHolder holder;
+ RETURN_IF_ERROR(
+ IRuntimeFilter::create_wrapper(¶ms, tracker.get(), pool, holder.getHandle()));
+ RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
+ cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string());
+ merged_size = cntVal->arrive_id.size();
+ // TODO: avoid log when we had acquired a lock
+ VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
+ DCHECK_LE(merged_size, cntVal->producer_size);
+ if (merged_size < cntVal->producer_size) {
+ return Status::OK();
+ }
+ }
+
+ if (merged_size == cntVal->producer_size) {
+ // prepare rpc context
+ using PPublishFilterRpcContext =
+ async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
+ std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
+ rpc_contexts.reserve(cntVal->target_info.size());
+
+ butil::IOBuf request_attachment;
+
+ PPublishFilterRequest apply_request;
+ // serialize filter
+ void* data = nullptr;
+ int len = 0;
+ bool has_attachment = false;
+ RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len));
+ if (data != nullptr && len > 0) {
+ request_attachment.append(data, len);
+ has_attachment = true;
+ }
+
+ // TODO: async send publish rpc
+ std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
+ for (size_t i = 0; i < targets.size(); i++) {
+ rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+ rpc_contexts[i]->request = apply_request;
+ rpc_contexts[i]->request.set_filter_id(request->filter_id());
+ *rpc_contexts[i]->request.mutable_query_id() = request->query_id();
+ if (has_attachment) {
+ rpc_contexts[i]->cntl.request_attachment().append(request_attachment);
+ }
+
+ // set fragment-id
+ auto request_fragment_id = rpc_contexts[i]->request.mutable_fragment_id();
+ request_fragment_id->set_hi(targets[i].target_fragment_instance_id.hi);
+ request_fragment_id->set_lo(targets[i].target_fragment_instance_id.lo);
+
+ PBackendService_Stub* stub = ExecEnv::GetInstance()->brpc_stub_cache()->get_stub(
+ targets[i].target_fragment_instance_addr);
+ LOG(INFO) << "send filter " << rpc_contexts[i]->request.filter_id()
+ << " to:" << targets[i].target_fragment_instance_addr.hostname << ":"
+ << targets[i].target_fragment_instance_addr.port
+ << rpc_contexts[i]->request.ShortDebugString();
+ if (stub == nullptr) {
+ rpc_contexts.pop_back();
+ continue;
+ }
+ stub->apply_filter(&rpc_contexts[i]->cntl, &rpc_contexts[i]->request,
+ &rpc_contexts[i]->response, nullptr);
+ }
+ /// TODO: use async and join rpc
+ }
+ return Status::OK();
+}
+
+Status RuntimeFilterMergeController::add_entity(
+ const TExecPlanFragmentParams& params,
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+ runtime_filter_merge_entity_closer entity_closer =
+ std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1);
+
+ std::lock_guard<std::mutex> guard(_controller_mutex);
+ UniqueId query_id(params.params.query_id);
+ std::string query_id_str = query_id.to_string();
+ auto iter = _filter_controller_map.find(query_id_str);
+
+ if (iter == _filter_controller_map.end()) {
+ *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
+ new RuntimeFilterMergeControllerEntity(), entity_closer);
+ _filter_controller_map[query_id_str] = *handle;
+ const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params;
+ if (params.params.__isset.runtime_filter_params) {
+ RETURN_IF_ERROR(handle->get()->init(query_id, filter_params));
+ }
+ } else {
+ *handle = _filter_controller_map[query_id_str].lock();
+ }
+ return Status::OK();
+}
+
+Status RuntimeFilterMergeController::acquire(
+ UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+ std::lock_guard<std::mutex> guard(_controller_mutex);
+ std::string query_id_str = query_id.to_string();
+ auto iter = _filter_controller_map.find(query_id_str);
+ if (iter == _filter_controller_map.end()) {
+ LOG(WARNING) << "not found entity, query-id:" << query_id_str;
+ return Status::InvalidArgument("not found entity");
+ }
+ *handle = _filter_controller_map[query_id_str].lock();
+ if (*handle == nullptr) {
+ return Status::InvalidArgument("entity is closed");
+ }
+ return Status::OK();
+}
+
+Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
+ std::lock_guard<std::mutex> guard(_controller_mutex);
+ _filter_controller_map.erase(queryId.to_string());
+ return Status::OK();
+}
+
+// auto called while call ~std::shared_ptr<RuntimeFilterMergeControllerEntity>
+void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
+ RuntimeFilterMergeControllerEntity* entity) {
+ controller->remove_entity(entity->query_id());
+ delete entity;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
new file mode 100644
index 0000000..cfcbbec
--- /dev/null
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "common/object_pool.h"
+#include "common/status.h"
+#include "exprs/runtime_filter.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+// defination for TRuntimeFilterDesc
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+
+namespace doris {
+class TUniqueId;
+class RuntimeFilter;
+class FragmentExecState;
+class PlanFragmentExecutor;
+class PPublishFilterRequest;
+class PMergeFilterRequest;
+
+/// producer:
+/// Filter filter;
+/// get_filter(filter_id, &filter);
+/// filter->merge(origin_filter)
+
+/// comsumer:
+/// get_filter(filter_id, &filter)
+/// filter->wait
+/// if filter->ready().ok(), use filter
+
+// owned by RuntimeState
+// RuntimeFilterMgr will be destoryed when RuntimeState is destoryed
+class RuntimeFilterMgr {
+public:
+ RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
+
+ ~RuntimeFilterMgr();
+
+ Status init();
+
+ // get a consumer filter by filter-id
+ Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
+
+ Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
+ // regist filter
+ Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
+ int node_id = -1);
+
+ // update filter by remote
+ Status update_filter(const PPublishFilterRequest* request, const char* data);
+
+ void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
+
+ Status get_merge_addr(TNetworkAddress* addr);
+
+private:
+ Status get_filter_by_role(const int filter_id, const RuntimeFilterRole role,
+ IRuntimeFilter** target);
+
+ struct RuntimeFilterMgrVal {
+ RuntimeFilterRole role; // consumer or producer
+ IRuntimeFilter* filter;
+ };
+ // RuntimeFilterMgr is owned by RuntimeState, so we only
+ // use filter_id as key
+ // key: "filter-id"
+ /// TODO: should it need protected by a mutex?
+ std::map<int32_t, RuntimeFilterMgrVal> _consumer_map;
+ std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
+
+ RuntimeState* _state;
+ MemTracker* _tracker;
+ ObjectPool _pool;
+
+ TNetworkAddress _merge_addr;
+
+ bool _has_merge_addr;
+};
+
+// controller -> <query-id, entity>
+// RuntimeFilterMergeControllerEntity is the context used by runtimefilter for merging
+// During a query, only the last sink node owns this class, with the end of the query,
+// the class is destroyed with the last fragment_exec.
+class RuntimeFilterMergeControllerEntity {
+public:
+ RuntimeFilterMergeControllerEntity() : _query_id(0, 0) {}
+ ~RuntimeFilterMergeControllerEntity() = default;
+
+ Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params);
+
+ // handle merge rpc
+ Status merge(const PMergeFilterRequest* request, const char* data);
+
+ UniqueId query_id() { return _query_id; }
+
+private:
+ Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+ const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
+ const int producer_size);
+
+ struct RuntimeFilterCntlVal {
+ int64_t create_time;
+ int producer_size;
+ TRuntimeFilterDesc runtime_filter_desc;
+ std::vector<doris::TRuntimeFilterTargetParams> target_info;
+ IRuntimeFilter* filter;
+ std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
+ std::shared_ptr<MemTracker> tracker;
+ std::shared_ptr<ObjectPool> pool;
+ };
+ UniqueId _query_id;
+ // protect _filter_map
+ std::mutex _filter_map_mutex;
+ // TODO: convert filter id to i32
+ // filter-id -> val
+ std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
+};
+
+// RuntimeFilterMergeController has a map query-id -> entity
+class RuntimeFilterMergeController {
+public:
+ RuntimeFilterMergeController() = default;
+ ~RuntimeFilterMergeController() = default;
+
+ // thread safe
+ // add a query-id -> entity
+ // If a query-id -> entity already exists
+ // add_entity will return a exists entity
+ Status add_entity(const TExecPlanFragmentParams& params,
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle);
+ // thread safe
+ // increate a reference count
+ // if a query-id is not exist
+ // Status.not_ok will be returned and a empty ptr will returned by *handle
+ Status acquire(UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle);
+
+ // thread safe
+ // remove a entity by query-id
+ // remove_entity will be called automatically by entity when entity is destroyed
+ Status remove_entity(UniqueId queryId);
+
+private:
+ std::mutex _controller_mutex;
+ // We store the weak pointer here.
+ // When the external object is destroyed, we need to clear this record
+ using FilterControllerMap =
+ std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+ // str(query-id) -> entity
+ FilterControllerMap _filter_controller_map;
+};
+
+using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>;
+
+void runtime_filter_merge_entity_close(RuntimeFilterMergeController* controller,
+ RuntimeFilterMergeControllerEntity* entity);
+
+} // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 5237646..441046e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -35,6 +35,7 @@
#include "runtime/initial_reservations.h"
#include "runtime/load_path_mgr.h"
#include "runtime/mem_tracker.h"
+#include "runtime/runtime_filter_mgr.h"
#include "util/cpu_info.h"
#include "util/disk_info.h"
#include "util/file_utils.h"
@@ -53,6 +54,7 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id,
: _fragment_mem_tracker(nullptr),
_profile("Fragment " + print_id(fragment_instance_id)),
_obj_pool(new ObjectPool()),
+ _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_is_cancelled(false),
@@ -78,6 +80,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
: _fragment_mem_tracker(nullptr),
_profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)),
_obj_pool(new ObjectPool()),
+ _runtime_filter_mgr(new RuntimeFilterMgr(fragment_exec_params.query_id, this)),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(fragment_exec_params.query_id),
@@ -93,6 +96,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
_error_log_file_path(""),
_error_log_file(nullptr),
_instance_buffer_reservation(new ReservationTracker) {
+ if (fragment_exec_params.__isset.runtime_filter_params) {
+ _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+ }
Status status =
init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env);
DCHECK(status.ok());
@@ -216,8 +222,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
_query_mem_tracker =
MemTracker::CreateTracker(bytes_limit, "RuntimeState:query:" + print_id(query_id),
_exec_env->process_mem_tracker(), true, false);
- _instance_mem_tracker = MemTracker::CreateTracker(
- &_profile, -1, "RuntimeState:instance:", _query_mem_tracker);
+ _instance_mem_tracker =
+ MemTracker::CreateTracker(&_profile, -1, "RuntimeState:instance:", _query_mem_tracker);
/*
// TODO: this is a stopgap until we implement ExprContext
@@ -241,6 +247,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
std::numeric_limits<int64_t>::max());
}
+ // filter manager depends _instance_mem_tracker
+ _runtime_filter_mgr->init();
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f62e63f..2152dfd 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -56,6 +56,7 @@ class LoadErrorHub;
class ReservationTracker;
class InitialReservations;
class RowDescriptor;
+class RuntimeFilterMgr;
// A collection of items that are part of the global state of a
// query and shared across all execution nodes of that query.
@@ -341,6 +342,10 @@ public:
bool enable_spill() const { return _query_options.enable_spilling; }
+ int32_t runtime_filter_wait_time_ms() { return _query_options.runtime_filter_wait_time_ms; }
+
+ int32_t runtime_filter_max_in_num() { return _query_options.runtime_filter_max_in_num; }
+
bool enable_exchange_node_parallel_merge() const {
return _query_options.enable_enable_exchange_node_parallel_merge;
}
@@ -363,6 +368,8 @@ public:
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();
+ RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
+
private:
// Use a custom block manager for the query for testing purposes.
void set_block_mgr2(const boost::shared_ptr<BufferedBlockMgr2>& block_mgr) {
@@ -393,6 +400,9 @@ private:
DescriptorTbl* _desc_tbl;
std::shared_ptr<ObjectPool> _obj_pool;
+ // runtime filter
+ std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
+
// Protects _data_stream_recvrs_pool
std::mutex _data_stream_recvrs_lock;
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 8023751..1141d7d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -105,8 +105,8 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
- auto st = _exec_env->load_channel_mgr()->add_batch(
- *request, response->mutable_tablet_vec());
+ auto st = _exec_env->load_channel_mgr()->add_batch(*request,
+ response->mutable_tablet_vec());
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
<< ", id=" << request->id() << ", index_id=" << request->index_id()
@@ -177,7 +177,8 @@ void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_b
const PFetchDataRequest* request, PFetchDataResult* result,
google::protobuf::Closure* done) {
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- bool resp_in_attachment = request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
+ bool resp_in_attachment =
+ request->has_resp_in_attachment() ? request->resp_in_attachment() : true;
GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, resp_in_attachment, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
@@ -196,8 +197,9 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
if (!kafka_request.offset_times().empty()) {
// if offset_times() has elements, which means this request is to get offset by timestamp.
std::vector<PIntegerPair> partition_offsets;
- Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(), &partition_offsets);
+ Status st =
+ _exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(), &partition_offsets);
if (st.ok()) {
PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets();
for (const auto& entry : partition_offsets) {
@@ -222,7 +224,7 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
st.to_protobuf(response->mutable_status());
return;
}
- }
+ }
Status::OK().to_protobuf(response->mutable_status());
}
@@ -253,6 +255,36 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
_exec_env->result_cache()->clear(request, response);
}
+template <typename T>
+void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* controller,
+ const ::doris::PMergeFilterRequest* request,
+ ::doris::PMergeFilterResponse* response,
+ ::google::protobuf::Closure* done) {
+ brpc::ClosureGuard closure_guard(done);
+ auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
+ Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
+ if (!st.ok()) {
+ LOG(WARNING) << "merge meet error" << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* controller,
+ const ::doris::PPublishFilterRequest* request,
+ ::doris::PPublishFilterResponse* response,
+ ::google::protobuf::Closure* done) {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+ UniqueId unique_id(request->query_id());
+ // TODO: avoid copy attachment copy
+ LOG(INFO) << "rpc apply_filter recv";
+ Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
+ if (!st.ok()) {
+ LOG(WARNING) << "apply filter meet error" << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+}
template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 1d794dc..247f985 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -83,6 +83,15 @@ public:
void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request,
PCacheResponse* response, google::protobuf::Closure* done) override;
+ void merge_filter(::google::protobuf::RpcController* controller,
+ const ::doris::PMergeFilterRequest* request,
+ ::doris::PMergeFilterResponse* response,
+ ::google::protobuf::Closure* done) override;
+ void apply_filter(::google::protobuf::RpcController* controller,
+ const ::doris::PPublishFilterRequest* request,
+ ::doris::PPublishFilterResponse* response,
+ ::google::protobuf::Closure* done) override;
+
private:
Status _exec_plan_fragment(const std::string& s_request);
diff --git a/be/test/exec/hash_table_test.cpp b/be/test/exec/hash_table_test.cpp
index b2c1f72..fae151b 100644
--- a/be/test/exec/hash_table_test.cpp
+++ b/be/test/exec/hash_table_test.cpp
@@ -37,10 +37,10 @@
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/test_env.h"
+#include "test_util/test_util.h"
#include "util/cpu_info.h"
#include "util/runtime_profile.h"
#include "util/time.h"
-#include "test_util/test_util.h"
namespace doris {
@@ -383,6 +383,12 @@ TEST_F(HashTableTest, GrowTableTest2) {
}
LOG(INFO) << time(NULL);
+
+ size_t counter = 0;
+ auto func = [&](TupleRow* row) { counter++; };
+ hash_table.for_each_row(func);
+ ASSERT_EQ(counter, hash_table.size());
+
hash_table.close();
}
diff --git a/be/test/exprs/CMakeLists.txt b/be/test/exprs/CMakeLists.txt
index 8c393aa..b24cd17 100644
--- a/be/test/exprs/CMakeLists.txt
+++ b/be/test/exprs/CMakeLists.txt
@@ -35,4 +35,5 @@ ADD_BE_TEST(encryption_functions_test)
#ADD_BE_TEST(in-predicate-test)
ADD_BE_TEST(math_functions_test)
ADD_BE_TEST(topn_function_test)
-
+ADD_BE_TEST(runtime_filter_test)
+ADD_BE_TEST(bloom_filter_predicate_test)
diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp
new file mode 100644
index 0000000..ea54a90
--- /dev/null
+++ b/be/test/exprs/bloom_filter_predicate_test.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "exprs/bloomfilter_predicate.h"
+#include "gtest/gtest.h"
+#include "runtime/string_value.h"
+
+namespace doris {
+class BloomFilterPredicateTest : public testing::Test {
+public:
+ BloomFilterPredicateTest() = default;
+ virtual void SetUp() {}
+ virtual void TearDown() {}
+};
+
+TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
+ auto tracker = MemTracker::CreateTracker();
+ std::unique_ptr<BloomFilterFuncBase> func(
+ BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_INT));
+ ASSERT_TRUE(func->init().ok());
+ const int data_size = 1024;
+ int data[data_size];
+ for (int i = 0; i < data_size; i++) {
+ data[i] = i;
+ func->insert((const void*)&data[i]);
+ }
+ for (int i = 0; i < data_size; i++) {
+ ASSERT_TRUE(func->find((const void*)&data[i]));
+ }
+ // test not exist val
+ int not_exist_val = 0x3355ff;
+ ASSERT_FALSE(func->find((const void*)¬_exist_val));
+}
+
+TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
+ auto tracker = MemTracker::CreateTracker();
+ std::unique_ptr<BloomFilterFuncBase> func(
+ BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_VARCHAR));
+ ASSERT_TRUE(func->init().ok());
+ ObjectPool obj_pool;
+ const int data_size = 1024;
+ StringValue data[data_size];
+ for (int i = 0; i < data_size; i++) {
+ auto str = obj_pool.add(new std::string(std::to_string(i)));
+ data[i] = StringValue(*str);
+ func->insert((const void*)&data[i]);
+ }
+ for (int i = 0; i < data_size; i++) {
+ ASSERT_TRUE(func->find((const void*)&data[i]));
+ }
+ // test not exist value
+ std::string not_exist_str = "0x3355ff";
+ StringValue not_exist_val(not_exist_str);
+ ASSERT_FALSE(func->find((const void*)¬_exist_val));
+
+ // test fixed char
+ func.reset(BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_CHAR));
+ ASSERT_TRUE(func->init().ok());
+
+ auto varchar_true_str = obj_pool.add(new std::string("true"));
+ StringValue varchar_true(*varchar_true_str);
+ func->insert((const void*)&varchar_true);
+
+ auto varchar_false_str = obj_pool.add(new std::string("false"));
+ StringValue varchar_false(*varchar_false_str);
+ func->insert((const void*)&varchar_false);
+
+ StringValue fixed_char_true;
+ char true_buf[100] = "true";
+ memset(true_buf + strlen(true_buf), 0, 100 - strlen(true_buf));
+ fixed_char_true.ptr = true_buf;
+ fixed_char_true.len = 10;
+
+ StringValue fixed_char_false;
+ char false_buf[100] = "false";
+ memset(false_buf + strlen(false_buf), 0, 100 - strlen(false_buf));
+ fixed_char_false.ptr = false_buf;
+ fixed_char_false.len = 10;
+
+ ASSERT_TRUE(func->find_olap_engine((const void*)&fixed_char_true));
+ ASSERT_TRUE(func->find_olap_engine((const void*)&fixed_char_false));
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
\ No newline at end of file
diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp
new file mode 100644
index 0000000..3cd858e
--- /dev/null
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/runtime_filter.h"
+
+#include <array>
+#include <memory>
+
+#include "exprs/expr_context.h"
+#include "exprs/slot_ref.h"
+#include "gen_cpp/Planner_types.h"
+#include "gen_cpp/Types_types.h"
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "runtime/runtime_state.h"
+
+namespace doris {
+TTypeDesc create_type_desc(PrimitiveType type);
+
+class RuntimeFilterTest : public testing::Test {
+public:
+ RuntimeFilterTest() {}
+ virtual void SetUp() {
+ ExecEnv* exec_env = ExecEnv::GetInstance();
+ exec_env = nullptr;
+ _runtime_stat.reset(
+ new RuntimeState(_fragment_id, _query_options, _query_globals, exec_env));
+ _runtime_stat->init_instance_mem_tracker();
+ }
+ virtual void TearDown() { _obj_pool.clear(); }
+
+private:
+ ObjectPool _obj_pool;
+ TUniqueId _fragment_id;
+ TQueryOptions _query_options;
+ TQueryGlobals _query_globals;
+
+ std::unique_ptr<RuntimeState> _runtime_stat;
+ // std::unique_ptr<IRuntimeFilter> _runtime_filter;
+};
+
+TEST_F(RuntimeFilterTest, runtime_filter_basic_test) {
+ TRuntimeFilterDesc desc;
+ desc.__set_filter_id(0);
+ desc.__set_expr_order(0);
+ desc.__set_has_local_targets(true);
+ desc.__set_has_remote_targets(false);
+ desc.__set_is_broadcast_join(true);
+ desc.__set_type(TRuntimeFilterType::BLOOM);
+ desc.__set_bloom_filter_size_bytes(4096);
+
+ // build src expr context
+
+ {
+ TExpr build_expr;
+ TExprNode expr_node;
+ expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+ expr_node.__set_type(create_type_desc(TYPE_INT));
+ expr_node.__set_num_children(0);
+ expr_node.__isset.slot_ref = true;
+ TSlotRef slot_ref;
+ slot_ref.__set_slot_id(0);
+ slot_ref.__set_tuple_id(0);
+ expr_node.__set_slot_ref(slot_ref);
+ expr_node.__isset.output_column = true;
+ expr_node.__set_output_column(0);
+ build_expr.nodes.push_back(expr_node);
+ desc.__set_src_expr(build_expr);
+ }
+ // build dst expr
+ {
+ TExpr target_expr;
+ TExprNode expr_node;
+ expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+ expr_node.__set_type(create_type_desc(TYPE_INT));
+ expr_node.__set_num_children(0);
+ expr_node.__isset.slot_ref = true;
+ TSlotRef slot_ref;
+ slot_ref.__set_slot_id(0);
+ slot_ref.__set_tuple_id(0);
+ expr_node.__set_slot_ref(slot_ref);
+ expr_node.__isset.output_column = true;
+ expr_node.__set_output_column(0);
+ target_expr.nodes.push_back(expr_node);
+ std::map<int, TExpr> planid_to_target_expr = {{0, target_expr}};
+ desc.__set_planId_to_target_expr(planid_to_target_expr);
+ }
+
+ // size_t prob_index = 0;
+ SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0));
+ ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr));
+ ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr));
+
+ IRuntimeFilter* runtime_filter = nullptr;
+
+ Status status = IRuntimeFilter::create(_runtime_stat.get(),
+ _runtime_stat->instance_mem_tracker().get(), &_obj_pool,
+ &desc, RuntimeFilterRole::PRODUCER, -1, &runtime_filter);
+
+ ASSERT_TRUE(status.ok());
+
+ // generate data
+ std::array<TupleRow, 1024> tuple_rows;
+ int generator_index = 0;
+ auto generator = [&]() {
+ std::array<int, 2>* data = _obj_pool.add(new std::array<int, 2>());
+ data->at(0) = data->at(1) = generator_index++;
+ TupleRow row;
+ row._tuples[0] = (Tuple*)data->data();
+ return row;
+ };
+ std::generate(tuple_rows.begin(), tuple_rows.end(), generator);
+
+ std::array<TupleRow, 1024> not_exist_data;
+ // generate not exist data
+ std::generate(not_exist_data.begin(), not_exist_data.end(), generator);
+
+ // build runtime filter
+ for (TupleRow& row : tuple_rows) {
+ void* val = build_expr_ctx->get_value(&row);
+ runtime_filter->insert(val);
+ }
+ // get expr context from filter
+
+ std::list<ExprContext*> expr_context_list;
+ ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, prob_expr_ctx).ok());
+ ASSERT_TRUE(!expr_context_list.empty());
+
+ // test data in
+ for (TupleRow& row : tuple_rows) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_TRUE(ctx->get_boolean_val(&row).val);
+ }
+ }
+ // test not exist data
+ for (TupleRow& row : not_exist_data) {
+ for (ExprContext* ctx : expr_context_list) {
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+ }
+
+ // test null
+ for (ExprContext* ctx : expr_context_list) {
+ std::array<int, 2>* data = _obj_pool.add(new std::array<int, 2>());
+ data->at(0) = data->at(1) = generator_index++;
+ TupleRow row;
+ row._tuples[0] = nullptr;
+ ASSERT_FALSE(ctx->get_boolean_val(&row).val);
+ }
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
\ No newline at end of file
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 5256cb3..442314c 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -31,6 +31,7 @@ ADD_BE_TEST(run_length_integer_test)
ADD_BE_TEST(stream_index_test)
ADD_BE_TEST(lru_cache_test)
ADD_BE_TEST(bloom_filter_test)
+ADD_BE_TEST(bloom_filter_column_predicate_test)
ADD_BE_TEST(bloom_filter_index_test)
ADD_BE_TEST(comparison_predicate_test)
ADD_BE_TEST(in_list_predicate_test)
diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp
new file mode 100644
index 0000000..feafafe
--- /dev/null
+++ b/be/test/olap/bloom_filter_column_predicate_test.cpp
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <google/protobuf/stubs/common.h>
+#include <gtest/gtest.h>
+#include <time.h>
+
+#include "olap/bloom_filter_predicate.h"
+#include "olap/column_predicate.h"
+#include "olap/field.h"
+#include "olap/row_block2.h"
+#include "runtime/mem_pool.h"
+#include "runtime/string_value.hpp"
+#include "runtime/vectorized_row_batch.h"
+#include "util/logging.h"
+
+namespace doris {
+
+class TestBloomFilterColumnPredicate : public testing::Test {
+public:
+ TestBloomFilterColumnPredicate() : _vectorized_batch(NULL), _row_block(nullptr) {
+ _mem_tracker.reset(new MemTracker(-1));
+ _mem_pool.reset(new MemPool(_mem_tracker.get()));
+ }
+
+ ~TestBloomFilterColumnPredicate() {
+ if (_vectorized_batch != NULL) {
+ delete _vectorized_batch;
+ }
+ }
+
+ void SetTabletSchema(std::string name, const std::string& type, const std::string& aggregation,
+ uint32_t length, bool is_allow_null, bool is_key,
+ TabletSchema* tablet_schema) {
+ TabletSchemaPB tablet_schema_pb;
+ static int id = 0;
+ ColumnPB* column = tablet_schema_pb.add_column();
+ column->set_unique_id(++id);
+ column->set_name(name);
+ column->set_type(type);
+ column->set_is_key(is_key);
+ column->set_is_nullable(is_allow_null);
+ column->set_length(length);
+ column->set_aggregation(aggregation);
+ column->set_precision(1000);
+ column->set_frac(1000);
+ column->set_is_bf_column(false);
+
+ tablet_schema->init_from_pb(tablet_schema_pb);
+ }
+
+ void InitVectorizedBatch(const TabletSchema* tablet_schema, const std::vector<uint32_t>& ids,
+ int size) {
+ _vectorized_batch = new VectorizedRowBatch(tablet_schema, ids, size);
+ _vectorized_batch->set_size(size);
+ }
+
+ void init_row_block(const TabletSchema* tablet_schema, int size) {
+ Schema schema(*tablet_schema);
+ _row_block.reset(new RowBlockV2(schema, size));
+ }
+
+ std::shared_ptr<MemTracker> _mem_tracker;
+ std::unique_ptr<MemPool> _mem_pool;
+ VectorizedRowBatch* _vectorized_batch;
+ std::unique_ptr<RowBlockV2> _row_block;
+};
+
+TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
+ TabletSchema tablet_schema;
+ SetTabletSchema(std::string("FLOAT_COLUMN"), "FLOAT", "REPLACE", 1, true, true, &tablet_schema);
+ const int size = 10;
+ std::vector<uint32_t> return_columns;
+ for (int i = 0; i < tablet_schema.num_columns(); ++i) {
+ return_columns.push_back(i);
+ }
+
+ auto tracker = MemTracker::CreateTracker(-1, "OlapScanner");
+ std::shared_ptr<BloomFilterFuncBase> bloom_filter =
+ std::make_shared<BloomFilterFunc<float>>(_mem_tracker.get());
+ bloom_filter->init();
+ float value = 4.1;
+ bloom_filter->insert(reinterpret_cast<void*>(&value));
+ value = 5.1;
+ bloom_filter->insert(reinterpret_cast<void*>(&value));
+ value = 6.1;
+ bloom_filter->insert(reinterpret_cast<void*>(&value));
+ ColumnPredicate* pred = new BloomFilterColumnPredicate(0, bloom_filter);
+
+ // for VectorizedBatch no null
+ InitVectorizedBatch(&tablet_schema, return_columns, size);
+ ColumnVector* col_vector = _vectorized_batch->column(0);
+ col_vector->set_no_nulls(true);
+ auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float)));
+ col_vector->set_col_data(col_data);
+ for (int i = 0; i < size; ++i) {
+ *(col_data + i) = i + 0.1f;
+ }
+ pred->evaluate(_vectorized_batch);
+ ASSERT_EQ(_vectorized_batch->size(), 10);
+ uint16_t* sel = _vectorized_batch->selected();
+ ASSERT_FLOAT_EQ(*(col_data + sel[0]), 0.1);
+ ASSERT_FLOAT_EQ(*(col_data + sel[1]), 1.1);
+ ASSERT_FLOAT_EQ(*(col_data + sel[2]), 2.1);
+
+ // for ColumnBlock no null
+ init_row_block(&tablet_schema, size);
+ ColumnBlock col_block = _row_block->column_block(0);
+ auto select_size = _row_block->selected_size();
+ ColumnBlockView col_block_view(&col_block);
+ for (int i = 0; i < size; ++i, col_block_view.advance(1)) {
+ col_block_view.set_null_bits(1, false);
+ *reinterpret_cast<float*>(col_block_view.data()) = i + 0.1f;
+ }
+ pred->evaluate(&col_block, _row_block->selection_vector(), &select_size);
+ ASSERT_EQ(select_size, 3);
+ ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 4.1);
+ ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[1]).cell_ptr(), 5.1);
+ ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[2]).cell_ptr(), 6.1);
+
+ // for VectorizedBatch has nulls
+ col_vector->set_no_nulls(false);
+ bool* is_null = reinterpret_cast<bool*>(_mem_pool->allocate(size));
+ memset(is_null, 0, size);
+ col_vector->set_is_null(is_null);
+ for (int i = 0; i < size; ++i) {
+ if (i % 2 == 0) {
+ is_null[i] = true;
+ } else {
+ *(col_data + i) = i + 0.1;
+ }
+ }
+ _vectorized_batch->set_size(size);
+ _vectorized_batch->set_selected_in_use(false);
+ pred->evaluate(_vectorized_batch);
+ ASSERT_EQ(_vectorized_batch->size(), 10);
+ sel = _vectorized_batch->selected();
+ ASSERT_FLOAT_EQ(*(col_data + sel[0]), 0.1);
+ ASSERT_FLOAT_EQ(*(col_data + sel[1]), 1.1);
+ ASSERT_FLOAT_EQ(*(col_data + sel[2]), 2.1);
+
+ // for ColumnBlock has nulls
+ col_block_view = ColumnBlockView(&col_block);
+ for (int i = 0; i < size; ++i, col_block_view.advance(1)) {
+ if (i % 2 == 0) {
+ col_block_view.set_null_bits(1, true);
+ } else {
+ col_block_view.set_null_bits(1, false);
+ *reinterpret_cast<float*>(col_block_view.data()) = i + 0.1;
+ }
+ }
+ _row_block->clear();
+ select_size = _row_block->selected_size();
+ pred->evaluate(&col_block, _row_block->selection_vector(), &select_size);
+ ASSERT_EQ(select_size, 1);
+ ASSERT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 5.1);
+
+ delete pred;
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), false)) {
+ fprintf(stderr, "error read config file. \n");
+ return -1;
+ }
+ doris::init_glog("be-test");
+ int ret = doris::OLAP_SUCCESS;
+ testing::InitGoogleTest(&argc, argv);
+ doris::CpuInfo::init();
+ ret = RUN_ALL_TESTS();
+ google::protobuf::ShutdownProtobufLibrary();
+ return ret;
+}
diff --git a/be/test/olap/in_list_predicate_test.cpp b/be/test/olap/in_list_predicate_test.cpp
index 1f314a9..2d6a38a 100644
--- a/be/test/olap/in_list_predicate_test.cpp
+++ b/be/test/olap/in_list_predicate_test.cpp
@@ -156,7 +156,7 @@ public:
*(col_data + i) = i; \
} \
\
- std::set<TYPE> values; \
+ std::unordered_set<TYPE> values; \
values.insert(4); \
values.insert(5); \
values.insert(6); \
@@ -203,7 +203,7 @@ TEST_IN_LIST_PREDICATE(int128_t, LARGEINT, "LARGEINT")
int size = 10; \
Schema schema(tablet_schema); \
RowBlockV2 block(schema, size); \
- std::set<TYPE> values; \
+ std::unordered_set<TYPE> values; \
values.insert(4); \
values.insert(5); \
values.insert(6); \
@@ -268,7 +268,7 @@ TEST_F(TestInListPredicate, FLOAT_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<float> values;
+ std::unordered_set<float> values;
values.insert(4.1);
values.insert(5.1);
values.insert(6.1);
@@ -352,7 +352,7 @@ TEST_F(TestInListPredicate, DOUBLE_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<double> values;
+ std::unordered_set<double> values;
values.insert(4.1);
values.insert(5.1);
values.insert(6.1);
@@ -437,7 +437,7 @@ TEST_F(TestInListPredicate, DECIMAL_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<decimal12_t> values;
+ std::unordered_set<decimal12_t> values;
decimal12_t value1 = {4, 4};
values.insert(value1);
@@ -530,7 +530,7 @@ TEST_F(TestInListPredicate, CHAR_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<StringValue> values;
+ std::unordered_set<StringValue> values;
StringValue value1;
const char* value1_buffer = "aaaaa";
value1.ptr = const_cast<char*>(value1_buffer);
@@ -658,7 +658,7 @@ TEST_F(TestInListPredicate, VARCHAR_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<StringValue> values;
+ std::unordered_set<StringValue> values;
StringValue value1;
const char* value1_buffer = "a";
value1.ptr = const_cast<char*>(value1_buffer);
@@ -783,7 +783,7 @@ TEST_F(TestInListPredicate, DATE_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<uint24_t> values;
+ std::unordered_set<uint24_t> values;
uint24_t value1 = datetime::timestamp_from_date("2017-09-09");
values.insert(value1);
@@ -892,7 +892,7 @@ TEST_F(TestInListPredicate, DATETIME_COLUMN) {
for (int i = 0; i < tablet_schema.num_columns(); ++i) {
return_columns.push_back(i);
}
- std::set<uint64_t> values;
+ std::unordered_set<uint64_t> values;
uint64_t value1 = datetime::timestamp_from_datetime("2017-09-09 00:00:01");
values.insert(value1);
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp
index cbb3ac8..cb4be94 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -40,8 +40,8 @@
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
-#include "util/file_utils.h"
#include "test_util/test_util.h"
+#include "util/file_utils.h"
namespace doris {
namespace segment_v2 {
@@ -1087,7 +1087,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
// test where v1 in (10,20,1)
{
std::vector<ColumnPredicate*> column_predicates;
- std::set<int32_t> values;
+ std::unordered_set<int32_t> values;
values.insert(10);
values.insert(20);
values.insert(1);
@@ -1111,7 +1111,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
// test where v1 not in (10,20)
{
std::vector<ColumnPredicate*> column_predicates;
- std::set<int32_t> values;
+ std::unordered_set<int32_t> values;
values.insert(10);
values.insert(20);
std::unique_ptr<ColumnPredicate> predicate(
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 6aaf40d..95b1be4 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -256,6 +256,75 @@ message PProxyResult {
optional PKafkaPartitionOffsets partition_offsets = 3;
};
+message PBloomFilter {
+ required bool always_true = 2;
+ required int32 filter_length = 1;
+};
+
+message PColumnValue {
+ optional bool boolVal = 1;
+ optional int32 intVal = 2;
+ optional int64 longVal = 3;
+ optional double doubleVal = 4;
+ optional bytes stringVal = 5;
+}
+
+// TODO: CHECK ALL TYPE
+enum PColumnType {
+ COLUMN_TYPE_BOOL = 0;
+ COLUMN_TYPE_INT = 1;
+ COLUMN_TYPE_TINY_INT = 2;
+ COLUMN_TYPE_SMALL_INT = 3;
+ COLUMN_TYPE_BIGINT = 4;
+ COLUMN_TYPE_LARGEINT = 5;
+ COLUMN_TYPE_VARCHAR = 6;
+ COLUMN_TYPE_CHAR = 7;
+ COLUMN_TYPE_DATE = 8;
+ COLUMN_TYPE_DATETIME = 9;
+ COLUMN_TYPE_DOUBLE = 10;
+ COLUMN_TYPE_FLOAT = 11;
+ COLUMN_TYPE_DECIMAL = 12;
+ COLUMN_TYPE_DECIMALV2 = 13;
+}
+
+message PMinMaxFilter {
+ required PColumnType column_type = 1;
+ required PColumnValue min_val = 2;
+ required PColumnValue max_val = 3;
+};
+
+enum PFilterType {
+ UNKNOW_FILTER = 0;
+ BLOOM_FILTER = 1;
+ MINMAX_FILTER = 2;
+};
+
+message PMergeFilterRequest {
+ required int32 filter_id = 1;
+ required PUniqueId query_id = 2;
+ required PUniqueId fragment_id = 3;
+ required PFilterType filter_type = 4;
+ optional PMinMaxFilter minmax_filter = 5;
+ optional PBloomFilter bloom_filter = 6;
+};
+
+message PMergeFilterResponse {
+ required PStatus status = 1;
+};
+
+message PPublishFilterRequest {
+ required int32 filter_id = 1;
+ required PUniqueId query_id = 2;
+ required PUniqueId fragment_id = 3;
+ required PFilterType filter_type = 4;
+ optional PMinMaxFilter minmax_filter = 5;
+ optional PBloomFilter bloom_filter = 6;
+};
+
+message PPublishFilterResponse {
+ required PStatus status = 1;
+};
+
// NOTE(zc): If you want to add new method here,
// you MUST add same method to palo_internal_service.proto
service PBackendService {
@@ -270,5 +339,7 @@ service PBackendService {
rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
rpc fetch_cache(PFetchCacheRequest) returns (PFetchCacheResult);
rpc clear_cache(PClearCacheRequest) returns (PCacheResponse);
+ rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
+ rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
};
diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto
index c57ca24..592fe58 100644
--- a/gensrc/proto/palo_internal_service.proto
+++ b/gensrc/proto/palo_internal_service.proto
@@ -38,4 +38,6 @@ service PInternalService {
rpc update_cache(doris.PUpdateCacheRequest) returns (doris.PCacheResponse);
rpc fetch_cache(doris.PFetchCacheRequest) returns (doris.PFetchCacheResult);
rpc clear_cache(doris.PClearCacheRequest) returns (doris.PCacheResponse);
+ rpc merge_filter(doris.PMergeFilterRequest) returns (doris.PMergeFilterResponse);
+ rpc apply_filter(doris.PPublishFilterRequest) returns (doris.PPublishFilterResponse);
};
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index 281e37a..e91573a 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -47,6 +47,9 @@ enum TExprNodeType {
// TODO: old style compute functions. this will be deprecated
COMPUTE_FUNCTION_CALL,
LARGE_INT_LITERAL,
+
+ // only used in runtime filter
+ BLOOM_PRED,
}
//enum TAggregationOp {
diff --git a/gensrc/thrift/Opcodes.thrift b/gensrc/thrift/Opcodes.thrift
index 09db137..d55989f 100644
--- a/gensrc/thrift/Opcodes.thrift
+++ b/gensrc/thrift/Opcodes.thrift
@@ -83,4 +83,5 @@ enum TExprOpcode {
FACTORIAL,
LAST_OPCODE,
EQ_FOR_NULL,
+ RT_FILTER,
}
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index c320a98..888af72 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -140,6 +140,30 @@ struct TQueryOptions {
31: optional bool enable_spilling = false;
// whether enable parallel merge in exchange node
32: optional bool enable_enable_exchange_node_parallel_merge = false;
+
+ // runtime filter run mode
+ 33: optional string runtime_filter_mode = "GLOBAL";
+
+ // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
+ // be rounded up to the nearest power of two.
+ 34: optional i32 runtime_bloom_filter_size = 1048576
+
+ // Minimum runtime bloom filter size, in bytes
+ 35: optional i32 runtime_bloom_filter_min_size = 1048576
+
+ // Maximum runtime bloom filter size, in bytes
+ 36: optional i32 runtime_bloom_filter_max_size = 16777216
+
+ // Time in ms to wait until runtime filters are delivered.
+ 37: optional i32 runtime_filter_wait_time_ms = 1000
+
+ // Maximum number of bloom runtime filters allowed per query
+ 38: optional i32 runtime_filters_max_num = 10
+
+ // Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
+ 39: optional i32 runtime_filter_type = 1;
+
+ 40: optional i32 runtime_filter_max_in_num = 1024;
}
@@ -159,6 +183,27 @@ struct TPlanFragmentDestination {
3: optional Types.TNetworkAddress brpc_server
}
+struct TRuntimeFilterTargetParams {
+ 1: required Types.TUniqueId target_fragment_instance_id
+ // The address of the instance where the fragment is expected to run
+ 2: required Types.TNetworkAddress target_fragment_instance_addr
+}
+
+struct TRuntimeFilterParams {
+ // Runtime filter merge instance address
+ 1: optional Types.TNetworkAddress runtime_filter_merge_addr
+
+ // Runtime filter ID to the instance address of the fragment,
+ // that is expected to use this runtime filter
+ 2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
+
+ // Runtime filter ID to the runtime filter desc
+ 3: optional map<i32, PlanNodes.TRuntimeFilterDesc> rid_to_runtime_filter
+
+ // Number of Runtime filter producers
+ 4: optional map<i32, i32> runtime_filter_builder_num
+}
+
// Parameters for a single execution instance of a particular TPlanFragment
// TODO: for range partitioning, we also need to specify the range boundaries
struct TPlanFragmentExecParams {
@@ -191,6 +236,8 @@ struct TPlanFragmentExecParams {
9: optional i32 sender_id
10: optional i32 num_senders
11: optional bool send_query_statistics_with_every_batch
+ // Used to merge and send runtime filter
+ 12: optional TRuntimeFilterParams runtime_filter_params
}
// Global query parameters assigned by the coordinator.
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ba1c8c6..320b9e0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -687,6 +687,47 @@ struct TAssertNumRowsNode {
3: optional TAssertion assertion;
}
+enum TRuntimeFilterType {
+ IN = 1
+ BLOOM = 2
+ MIN_MAX = 4
+}
+
+// Specification of a runtime filter.
+struct TRuntimeFilterDesc {
+ // Filter unique id (within a query)
+ 1: required i32 filter_id
+
+ // Expr on which the filter is built on a hash join.
+ 2: required Exprs.TExpr src_expr
+
+ // The order of Expr in join predicate
+ 3: required i32 expr_order
+
+ // Map of target node id to the target expr
+ 4: required map<Types.TPlanNodeId, Exprs.TExpr> planId_to_target_expr
+
+ // Indicates if the source join node of this filter is a broadcast or
+ // a partitioned join.
+ 5: required bool is_broadcast_join
+
+ // Indicates if there is at least one target scan node that is in the
+ // same fragment as the broadcast join that produced the runtime filter
+ 6: required bool has_local_targets
+
+ // Indicates if there is at least one target scan node that is not in the same
+ // fragment as the broadcast join that produced the runtime filter
+ 7: required bool has_remote_targets
+
+ // The type of runtime filter to build.
+ 8: required TRuntimeFilterType type
+
+ // The size of the filter based on the ndv estimate and the min/max limit specified in
+ // the query options. Should be greater than zero for bloom filters, zero otherwise.
+ 9: optional i64 bloom_filter_size_bytes
+}
+
+
// This is essentially a union of all messages corresponding to subclasses
// of PlanNode.
struct TPlanNode {
@@ -728,6 +769,8 @@ struct TPlanNode {
33: optional TIntersectNode intersect_node
34: optional TExceptNode except_node
35: optional TOdbcScanNode odbc_scan_node
+ // Runtime filters assigned to this plan node, exist in HashJoinNode and ScanNode
+ 36: optional list<TRuntimeFilterDesc> runtime_filters
}
// A flattened representation of a tree of PlanNodes, obtained by depth-first
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 34ef5a2..544e29b 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -194,6 +194,21 @@ enum TExplainLevel {
VERBOSE
}
+enum TRuntimeFilterMode {
+ // No filters are computed in the FE or the BE.
+ OFF = 0
+
+ // Only broadcast filters are computed in the BE, and are only published to the local
+ // fragment.
+ LOCAL = 1
+
+ // Only shuffle filters are computed in the BE, and are only published globally.
+ REMOTE = 2
+
+ // All fiters are computed in the BE, and are published globally.
+ GLOBAL = 3
+}
+
struct TColumnType {
1: required TPrimitiveType type
// Only set if type == CHAR_ARRAY
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org