You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/20 01:46:44 UTC
[doris] branch master updated: [feature] support runtime filter on vectorized engine (#10103)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 588634ddf6 [feature] support runtime filter on vectorized engine (#10103)
588634ddf6 is described below
commit 588634ddf6d1c6260119dadbf426a52213caca7d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Mon Jun 20 09:46:38 2022 +0800
[feature] support runtime filter on vectorized engine (#10103)
---
be/src/exprs/expr.h | 82 ++++++
be/src/exprs/hybrid_set.h | 37 +++
be/src/exprs/literal.h | 6 +-
be/src/exprs/runtime_filter.cpp | 388 +++++++++++++++++++---------
be/src/exprs/runtime_filter.h | 11 +
be/src/runtime/types.cpp | 14 +
be/src/runtime/types.h | 2 +
be/src/util/simd/bits.h | 20 ++
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/exec/volap_scan_node.cpp | 64 ++++-
be/src/vec/exec/volap_scan_node.h | 2 +
be/src/vec/exprs/vbloom_predicate.cpp | 96 +++++++
be/src/vec/exprs/vbloom_predicate.h | 48 ++++
be/src/vec/exprs/vexpr.cpp | 10 +
be/src/vec/exprs/vexpr.h | 1 +
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 111 ++++++++
be/src/vec/exprs/vruntimefilter_wrapper.h | 63 +++++
17 files changed, 822 insertions(+), 135 deletions(-)
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 4e95c53237..e4cb02ea1b 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -28,6 +28,10 @@
#include "exprs/expr_value.h"
#include "gen_cpp/Opcodes_types.h"
#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/string_value.h"
+#include "runtime/string_value.hpp"
+#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "udf/udf.h"
@@ -465,4 +469,82 @@ inline bool Expr::evaluate(VectorizedRowBatch* batch) {
}
}
+template <typename T>
+Status create_texpr_literal_node(const void* data, TExprNode* node) {
+ if constexpr (std::is_same_v<bool, T>) {
+ auto origin_value = reinterpret_cast<const T*>(data);
+ TBoolLiteral boolLiteral;
+ (*node).__set_node_type(TExprNodeType::BOOL_LITERAL);
+ boolLiteral.__set_value(*origin_value);
+ (*node).__set_bool_literal(boolLiteral);
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ } else if constexpr (std::is_same_v<int8_t, T> || std::is_same_v<int16_t, T> ||
+ std::is_same_v<int32_t, T> || std::is_same_v<int64_t, T>) {
+ auto origin_value = reinterpret_cast<const T*>(data);
+ (*node).__set_node_type(TExprNodeType::INT_LITERAL);
+ TIntLiteral intLiteral;
+ intLiteral.__set_value(*origin_value);
+ (*node).__set_int_literal(intLiteral);
+ if constexpr (std::is_same_v<int8_t, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT));
+ } else if constexpr (std::is_same_v<int16_t, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT));
+ } else if constexpr (std::is_same_v<int32_t, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT));
+ } else if constexpr (std::is_same_v<int64_t, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT));
+ }
+ } else if constexpr (std::is_same_v<__int128_t, T>) {
+ auto origin_value = reinterpret_cast<const T*>(data);
+ (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL);
+ TLargeIntLiteral large_int_literal;
+ large_int_literal.__set_value(LargeIntValue::to_string(*origin_value));
+ (*node).__set_large_int_literal(large_int_literal);
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT));
+ } else if constexpr (std::is_same_v<DateTimeValue, T>) {
+ auto origin_value = reinterpret_cast<const doris::DateTimeValue*>(data);
+ TDateLiteral date_literal;
+ char convert_buffer[30];
+ origin_value->to_string(convert_buffer);
+ date_literal.__set_value(convert_buffer);
+ (*node).__set_date_literal(date_literal);
+ (*node).__set_node_type(TExprNodeType::DATE_LITERAL);
+ if (origin_value->type() == TimeType::TIME_DATE) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATE));
+ } else if (origin_value->type() == TimeType::TIME_DATETIME) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIME));
+ } else if (origin_value->type() == TimeType::TIME_TIME) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME));
+ }
+ } else if constexpr (std::is_same_v<DecimalV2Value, T>) {
+ auto origin_value = reinterpret_cast<const DecimalV2Value*>(data);
+ (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
+ TDecimalLiteral decimal_literal;
+ decimal_literal.__set_value(origin_value->to_string());
+ (*node).__set_decimal_literal(decimal_literal);
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2));
+ } else if constexpr (std::is_same_v<float, T> || std::is_same_v<double, T>) {
+ auto origin_value = reinterpret_cast<const T*>(data);
+ (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
+ TFloatLiteral float_literal;
+ float_literal.__set_value(*origin_value);
+ (*node).__set_float_literal(float_literal);
+ if constexpr (std::is_same_v<float, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT));
+ } else if constexpr (std::is_same_v<double, T>) {
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE));
+ }
+ } else if constexpr (std::is_same_v<StringValue, T>) {
+ auto origin_value = reinterpret_cast<const StringValue*>(data);
+ (*node).__set_node_type(TExprNodeType::STRING_LITERAL);
+ TStringLiteral string_literal;
+ string_literal.__set_value(origin_value->to_string());
+ (*node).__set_string_literal(string_literal);
+ (*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING));
+ } else {
+ return Status::InvalidArgument("Invalid argument type!");
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 578c8d2e96..81f71d45f2 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -22,8 +22,14 @@
#include <cstring>
#include "common/object_pool.h"
+#include "common/status.h"
+#include "exprs/expr.h"
+#include "runtime/datetime_value.h"
#include "runtime/decimalv2_value.h"
+#include "runtime/large_int_value.h"
+#include "runtime/primitive_type.h"
#include "runtime/string_value.h"
+#include "vec/exprs/vliteral.h"
namespace doris {
@@ -41,6 +47,9 @@ public:
virtual bool find(void* data) = 0;
// use in vectorize execute engine
virtual bool find(void* data, size_t) = 0;
+
+ virtual Status to_vexpr_list(doris::ObjectPool* pool,
+ std::vector<doris::vectorized::VExpr*>* vexpr_list) = 0;
class IteratorBase {
public:
IteratorBase() {}
@@ -60,6 +69,20 @@ public:
~HybridSet() override = default;
+ virtual Status to_vexpr_list(doris::ObjectPool* pool,
+ std::vector<doris::vectorized::VExpr*>* vexpr_list) override {
+ HybridSetBase::IteratorBase* it = begin();
+ DCHECK(it != nullptr);
+ while (it->has_next()) {
+ TExprNode node;
+ const void* v = it->get_value();
+ create_texpr_literal_node<T>(v, &node);
+ vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node)));
+ it->next();
+ }
+ return Status::OK();
+ };
+
void insert(const void* data) override {
if (data == nullptr) return;
@@ -119,6 +142,20 @@ public:
~StringValueSet() override = default;
+ virtual Status to_vexpr_list(doris::ObjectPool* pool,
+ std::vector<doris::vectorized::VExpr*>* vexpr_list) override {
+ HybridSetBase::IteratorBase* it = begin();
+ DCHECK(it != nullptr);
+ while (it->has_next()) {
+ TExprNode node;
+ const void* v = it->get_value();
+ create_texpr_literal_node<StringValue>(v, &node);
+ vexpr_list->push_back(pool->add(new doris::vectorized::VLiteral(node)));
+ it->next();
+ }
+ return Status::OK();
+ };
+
void insert(const void* data) override {
if (data == nullptr) return;
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index ebccdd3b9e..fc77e06ad7 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -29,6 +29,7 @@ class TExprNode;
class Literal final : public Expr {
public:
+ Literal(const TExprNode& node);
virtual ~Literal();
virtual Expr* clone(ObjectPool* pool) const override { return pool->add(new Literal(*this)); }
@@ -49,11 +50,6 @@ public:
virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
ExprContext* context) override;
-protected:
- friend class Expr;
- friend Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data);
- Literal(const TExprNode& node);
-
private:
ExprValue _value;
};
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1c25eaa152..2533c8c2d1 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -37,6 +37,9 @@
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/string_parser.hpp"
+#include "vec/exprs/vbloom_predicate.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
namespace doris {
// PrimitiveType->TExprNodeType
@@ -193,105 +196,71 @@ PFilterType get_type(RuntimeFilterType type) {
}
}
-TTypeDesc create_type_desc(PrimitiveType type) {
- TTypeDesc type_desc;
- std::vector<TTypeNode> node_type;
- node_type.emplace_back();
- TScalarType scalarType;
- scalarType.__set_type(to_thrift(type));
- scalarType.__set_len(-1);
- scalarType.__set_precision(-1);
- scalarType.__set_scale(-1);
- node_type.back().__set_scalar_type(scalarType);
- type_desc.__set_types(node_type);
- return type_desc;
-}
-
-// only used to push down to olap engine
-Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
+template <bool is_vectorized = false>
+Status create_literal(ObjectPool* pool, PrimitiveType type, const void* data, void** expr) {
TExprNode node;
switch (type) {
case TYPE_BOOLEAN: {
- TBoolLiteral boolLiteral;
- boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
- node.__set_bool_literal(boolLiteral);
+ create_texpr_literal_node<bool>(data, &node);
break;
}
case TYPE_TINYINT: {
- TIntLiteral intLiteral;
- intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
- node.__set_int_literal(intLiteral);
+ create_texpr_literal_node<int8_t>(data, &node);
break;
}
case TYPE_SMALLINT: {
- TIntLiteral intLiteral;
- intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
- node.__set_int_literal(intLiteral);
+ create_texpr_literal_node<int16_t>(data, &node);
break;
}
case TYPE_INT: {
- TIntLiteral intLiteral;
- intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
- node.__set_int_literal(intLiteral);
+ create_texpr_literal_node<int32_t>(data, &node);
break;
}
case TYPE_BIGINT: {
- TIntLiteral intLiteral;
- intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
- node.__set_int_literal(intLiteral);
+ create_texpr_literal_node<int64_t>(data, &node);
break;
}
case TYPE_LARGEINT: {
- TLargeIntLiteral largeIntLiteral;
- largeIntLiteral.__set_value(
- LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
- node.__set_large_int_literal(largeIntLiteral);
+ create_texpr_literal_node<int128_t>(data, &node);
break;
}
case TYPE_FLOAT: {
- TFloatLiteral floatLiteral;
- floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
- node.__set_float_literal(floatLiteral);
+ create_texpr_literal_node<float_t>(data, &node);
break;
}
case TYPE_DOUBLE: {
- TFloatLiteral floatLiteral;
- floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
- node.__set_float_literal(floatLiteral);
+ create_texpr_literal_node<double_t>(data, &node);
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
- TDateLiteral dateLiteral;
- char convert_buffer[30];
- reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
- dateLiteral.__set_value(convert_buffer);
- node.__set_date_literal(dateLiteral);
+ create_texpr_literal_node<DateTimeValue>(data, &node);
break;
}
case TYPE_DECIMALV2: {
- TDecimalLiteral decimalLiteral;
- decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
- node.__set_decimal_literal(decimalLiteral);
+ create_texpr_literal_node<DecimalV2Value>(data, &node);
break;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
- const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
- TStringLiteral tstringLiteral;
- tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
- node.__set_string_literal(tstringLiteral);
+ create_texpr_literal_node<StringValue>(data, &node);
break;
}
default:
DCHECK(false);
- return nullptr;
+ return Status::InvalidArgument("Invalid type!");
+ }
+
+ if constexpr (is_vectorized) {
+ *reinterpret_cast<doris::vectorized::VExpr**>(expr) =
+ pool->add(new doris::vectorized::VLiteral(node));
+ } else {
+ *reinterpret_cast<Expr**>(expr) = pool->add(new Literal(node));
}
- node.__set_node_type(get_expr_node_type(type));
- node.__set_type(create_type_desc(type));
- return pool->add(new Literal(node));
+
+ return Status::OK();
}
BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
@@ -312,6 +281,64 @@ BinaryPredicate* create_bin_predicate(ObjectPool* pool, PrimitiveType prim_type,
node.__set_node_type(TExprNodeType::BINARY_PRED);
return (BinaryPredicate*)pool->add(BinaryPredicate::from_thrift(node));
}
+
+Status create_vbin_predicate(ObjectPool* pool, PrimitiveType prim_type, TExprOpcode::type opcode,
+ doris::vectorized::VExpr** expr, TExprNode* tnode) {
+ TExprNode node;
+ TScalarType tscalar_type;
+ tscalar_type.__set_type(TPrimitiveType::BOOLEAN);
+ TTypeNode ttype_node;
+ ttype_node.__set_type(TTypeNodeType::SCALAR);
+ ttype_node.__set_scalar_type(tscalar_type);
+ TTypeDesc t_type_desc;
+ t_type_desc.types.push_back(ttype_node);
+ node.__set_type(t_type_desc);
+ node.__set_opcode(opcode);
+ node.__set_vector_opcode(opcode);
+ node.__set_child_type(to_thrift(prim_type));
+ node.__set_num_children(2);
+ node.__set_output_scale(-1);
+ node.__set_node_type(TExprNodeType::BINARY_PRED);
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ switch (opcode) {
+ case TExprOpcode::LE:
+ fn_name.__set_function_name("le");
+ break;
+ case TExprOpcode::GE:
+ fn_name.__set_function_name("ge");
+ break;
+ default:
+ Status::InvalidArgument(
+ strings::Substitute("Invalid opcode for max_min_runtimefilter: '$0'", opcode));
+ }
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+
+ TTypeNode type_node;
+ type_node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(to_thrift(prim_type));
+ type_node.__set_scalar_type(scalar_type);
+
+ std::vector<TTypeNode> type_nodes;
+ type_nodes.push_back(type_node);
+
+ TTypeDesc type_desc;
+ type_desc.__set_types(type_nodes);
+
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(type_desc);
+ arg_types.push_back(type_desc);
+ fn.__set_arg_types(arg_types);
+
+ fn.__set_ret_type(t_type_desc);
+ fn.__set_has_var_args(false);
+ node.__set_fn(fn);
+ *tnode = node;
+ return doris::vectorized::VExpr::create_expr(pool, node, expr);
+}
// This class is a wrapper of runtime predicate function
class RuntimePredicateWrapper {
public:
@@ -447,71 +474,10 @@ public:
}
template <class T>
- Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr) {
- DCHECK(state != nullptr);
- DCHECK(container != nullptr);
- DCHECK(_pool != nullptr);
- DCHECK(prob_expr->root()->type().type == _column_return_type ||
- (is_string_type(prob_expr->root()->type().type) &&
- is_string_type(_column_return_type)));
-
- auto real_filter_type = get_real_type();
- switch (real_filter_type) {
- case RuntimeFilterType::IN_FILTER: {
- if (!_is_ignored_in_filter) {
- TTypeDesc type_desc = create_type_desc(_column_return_type);
- TExprNode node;
- node.__set_type(type_desc);
- node.__set_node_type(TExprNodeType::IN_PRED);
- node.in_predicate.__set_is_not_in(false);
- node.__set_opcode(TExprOpcode::FILTER_IN);
- node.__isset.vector_opcode = true;
- node.__set_vector_opcode(to_in_opcode(_column_return_type));
- auto in_pred = _pool->add(new InPredicate(node));
- RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
- in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- ExprContext* ctx = _pool->add(new ExprContext(in_pred));
- container->push_back(ctx);
- }
- break;
- }
- case RuntimeFilterType::MINMAX_FILTER: {
- // create max filter
- auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
- auto max_literal = create_literal(_pool, _column_return_type, _minmax_func->get_max());
- max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- max_pred->add_child(max_literal);
- container->push_back(_pool->add(new ExprContext(max_pred)));
- // create min filter
- auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
- auto min_literal = create_literal(_pool, _column_return_type, _minmax_func->get_min());
- min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- min_pred->add_child(min_literal);
- container->push_back(_pool->add(new ExprContext(min_pred)));
- break;
- }
- case RuntimeFilterType::BLOOM_FILTER: {
- // create a bloom filter
- TTypeDesc type_desc = create_type_desc(_column_return_type);
- TExprNode node;
- node.__set_type(type_desc);
- node.__set_node_type(TExprNodeType::BLOOM_PRED);
- node.__set_opcode(TExprOpcode::RT_FILTER);
- node.__isset.vector_opcode = true;
- node.__set_vector_opcode(to_in_opcode(_column_return_type));
- auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
- RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
- bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
- ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
- container->push_back(ctx);
- break;
- }
- default:
- DCHECK(false);
- break;
- }
- return Status::OK();
- }
+ Status get_push_context(T* container, RuntimeState* state, ExprContext* prob_expr);
+
+ Status get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container, RuntimeState* state,
+ doris::vectorized::VExprContext* prob_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -986,6 +952,21 @@ Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr
return Status::OK();
}
+Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* vexprs,
+ const RowDescriptor& desc,
+ const std::shared_ptr<MemTracker>& tracker) {
+ DCHECK(_is_ready);
+ DCHECK(is_consumer());
+ std::lock_guard<std::mutex> guard(_inner_mutex);
+
+ if (_push_down_vexprs.empty()) {
+ RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state, _vprobe_ctx));
+ }
+ // push expr
+ vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end());
+ return Status::OK();
+}
+
bool IRuntimeFilter::await() {
DCHECK(is_consumer());
SCOPED_TIMER(_await_time_cost);
@@ -1054,6 +1035,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
return Status::InternalError("not found a node id");
}
RETURN_IF_ERROR(Expr::create_expr_tree(_pool, iter->second, &_probe_ctx));
+ RETURN_IF_ERROR(
+ doris::vectorized::VExpr::create_expr_tree(_pool, iter->second, &_vprobe_ctx));
}
_wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
@@ -1407,6 +1390,167 @@ Status IRuntimeFilter::consumer_close() {
return Status::OK();
}
+template <class T>
+Status RuntimePredicateWrapper::get_push_context(T* container, RuntimeState* state,
+ ExprContext* prob_expr) {
+ DCHECK(state != nullptr);
+ DCHECK(container != nullptr);
+ DCHECK(_pool != nullptr);
+ DCHECK(prob_expr->root()->type().type == _column_return_type ||
+ (is_string_type(prob_expr->root()->type().type) && is_string_type(_column_return_type)));
+
+ auto real_filter_type = get_real_type();
+ switch (real_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ if (!_is_ignored_in_filter) {
+ TTypeDesc type_desc = create_type_desc(_column_return_type);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::IN_PRED);
+ node.in_predicate.__set_is_not_in(false);
+ node.__set_opcode(TExprOpcode::FILTER_IN);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ auto in_pred = _pool->add(new InPredicate(node));
+ RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release()));
+ in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ ExprContext* ctx = _pool->add(new ExprContext(in_pred));
+ container->push_back(ctx);
+ }
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ // create max filter
+ Expr* max_literal = nullptr;
+ auto max_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::LE);
+ RETURN_IF_ERROR(create_literal<false>(_pool, _column_return_type, _minmax_func->get_max(),
+ (void**)&max_literal));
+ max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ max_pred->add_child(max_literal);
+ container->push_back(_pool->add(new ExprContext(max_pred)));
+ // create min filter
+ Expr* min_literal = nullptr;
+ auto min_pred = create_bin_predicate(_pool, _column_return_type, TExprOpcode::GE);
+ RETURN_IF_ERROR(create_literal<false>(_pool, _column_return_type, _minmax_func->get_min(),
+ (void**)&min_literal));
+ min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ min_pred->add_child(min_literal);
+ container->push_back(_pool->add(new ExprContext(min_pred)));
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ // create a bloom filter
+ TTypeDesc type_desc = create_type_desc(_column_return_type);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::BLOOM_PRED);
+ node.__set_opcode(TExprOpcode::RT_FILTER);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
+ RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func.release()));
+ bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
+ ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
+ container->push_back(ctx);
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+ return Status::OK();
+}
+
+Status RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::VExpr*>* container,
+ RuntimeState* state,
+ doris::vectorized::VExprContext* vprob_expr) {
+ DCHECK(state != nullptr);
+ DCHECK(container != nullptr);
+ DCHECK(_pool != nullptr);
+ DCHECK(vprob_expr->root()->type().type == _column_return_type ||
+ (is_string_type(vprob_expr->root()->type().type) &&
+ is_string_type(_column_return_type)));
+
+ auto real_filter_type = get_real_type();
+ switch (real_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ if (!_is_ignored_in_filter) {
+ TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::IN_PRED);
+ node.in_predicate.__set_is_not_in(false);
+ node.__set_opcode(TExprOpcode::FILTER_IN);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+
+ // VInPredicate
+ doris::vectorized::VExpr* expr = nullptr;
+ RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr(_pool, node, &expr));
+ auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+ expr->add_child(cloned_vexpr);
+
+ auto& children = const_cast<std::vector<doris::vectorized::VExpr*>&>(expr->children());
+ _hybrid_set->to_vexpr_list(_pool, &children);
+ container->push_back(
+ _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, expr)));
+ }
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ doris::vectorized::VExpr* max_pred = nullptr;
+ // create max filter
+ TExprNode max_pred_node;
+ RETURN_IF_ERROR(create_vbin_predicate(_pool, _column_return_type, TExprOpcode::LE,
+ &max_pred, &max_pred_node));
+ doris::vectorized::VExpr* max_literal = nullptr;
+ RETURN_IF_ERROR(create_literal<true>(_pool, _column_return_type, _minmax_func->get_max(),
+ (void**)&max_literal));
+ auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+ max_pred->add_child(cloned_vexpr);
+ max_pred->add_child(max_literal);
+ container->push_back(
+ _pool->add(new doris::vectorized::VRuntimeFilterWrapper(max_pred_node, max_pred)));
+
+ // create min filter
+ doris::vectorized::VExpr* min_pred = nullptr;
+ TExprNode min_pred_node;
+ RETURN_IF_ERROR(create_vbin_predicate(_pool, _column_return_type, TExprOpcode::GE,
+ &min_pred, &min_pred_node));
+ doris::vectorized::VExpr* min_literal = nullptr;
+ RETURN_IF_ERROR(create_literal<true>(_pool, _column_return_type, _minmax_func->get_min(),
+ (void**)&min_literal));
+ cloned_vexpr = vprob_expr->root()->clone(_pool);
+ min_pred->add_child(cloned_vexpr);
+ min_pred->add_child(min_literal);
+ container->push_back(
+ _pool->add(new doris::vectorized::VRuntimeFilterWrapper(min_pred_node, min_pred)));
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ // create a bloom filter
+ TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::BLOOM_PRED);
+ node.__set_opcode(TExprOpcode::RT_FILTER);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(to_in_opcode(_column_return_type));
+ auto bloom_pred = _pool->add(new doris::vectorized::VBloomPredicate(node));
+ bloom_pred->set_filter(_bloomfilter_func);
+ auto cloned_vexpr = vprob_expr->root()->clone(_pool);
+ bloom_pred->add_child(cloned_vexpr);
+ auto wrapper = _pool->add(new doris::vectorized::VRuntimeFilterWrapper(node, bloom_pred));
+ container->push_back(wrapper);
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+ return Status::OK();
+}
+
RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e2c6561388..5273788894 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -44,6 +44,11 @@ class PMinMaxFilter;
class HashJoinNode;
class RuntimeProfile;
+namespace vectorized {
+class VExpr;
+class VExprContext;
+} // namespace vectorized
+
enum class RuntimeFilterType {
UNKNOWN_FILTER = -1,
IN_FILTER = 0,
@@ -157,6 +162,10 @@ public:
const RowDescriptor& desc,
const std::shared_ptr<MemTracker>& tracker);
+ Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* push_vexprs,
+ const RowDescriptor& desc,
+ const std::shared_ptr<MemTracker>& tracker);
+
bool is_broadcast_join() const { return _is_broadcast_join; }
bool has_remote_target() const { return _has_remote_target; }
@@ -269,6 +278,7 @@ protected:
// it only used in consumer to generate runtime_filter expr_context
// we don't have to prepare it or close it
ExprContext* _probe_ctx;
+ doris::vectorized::VExprContext* _vprobe_ctx;
// Indicate whether runtime filter expr has been ignored
bool _is_ignored;
@@ -279,6 +289,7 @@ protected:
// these context is called prepared by this,
// consumer_close should be called before release
std::vector<ExprContext*> _push_down_ctxs;
+ std::vector<doris::vectorized::VExpr*> _push_down_vexprs;
struct rpc_context;
std::shared_ptr<rpc_context> _rpc_context;
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 58261a216b..c53febfc40 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -201,4 +201,18 @@ std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type) {
os << type.debug_string();
return os;
}
+
+TTypeDesc create_type_desc(PrimitiveType type) {
+ TTypeDesc type_desc;
+ std::vector<TTypeNode> node_type;
+ node_type.emplace_back();
+ TScalarType scalarType;
+ scalarType.__set_type(to_thrift(type));
+ scalarType.__set_len(-1);
+ scalarType.__set_precision(-1);
+ scalarType.__set_scale(-1);
+ node_type.back().__set_scalar_type(scalarType);
+ type_desc.__set_types(node_type);
+ return type_desc;
+}
} // namespace doris
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 2987064962..5852c2fb91 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -216,4 +216,6 @@ private:
std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type);
+TTypeDesc create_type_desc(PrimitiveType type);
+
} // namespace doris
diff --git a/be/src/util/simd/bits.h b/be/src/util/simd/bits.h
index 3d5b5e1bf7..d15243128d 100644
--- a/be/src/util/simd/bits.h
+++ b/be/src/util/simd/bits.h
@@ -59,5 +59,25 @@ inline uint32_t bytes32_mask_to_bits32_mask(const bool* data) {
return bytes32_mask_to_bits32_mask(reinterpret_cast<const uint8_t*>(data));
}
+// compiler will make this SIMD automatically
+inline size_t count_zero_num(const int8_t* __restrict data, size_t size) {
+ size_t num = 0;
+ const int8_t* end = data + size;
+ for (; data < end; ++data) {
+ num += (*data == 0);
+ }
+ return num;
+}
+
+inline size_t count_zero_num(const int8_t* __restrict data, const uint8_t* __restrict null_map,
+ size_t size) {
+ size_t num = 0;
+ const int8_t* end = data + size;
+ for (; data < end; ++data, ++null_map) {
+ num += ((*data == 0) | *null_map);
+ }
+ return num;
+}
+
} // namespace simd
} // namespace doris
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 6334a92a08..69aaf816ba 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -115,6 +115,8 @@ set(VEC_FILES
exprs/vliteral.cpp
exprs/varray_literal.cpp
exprs/vin_predicate.cpp
+ exprs/vbloom_predicate.cpp
+ exprs/vruntimefilter_wrapper.cpp
exprs/vtuple_is_null_predicate.cpp
exprs/vslot_ref.cpp
exprs/vcast_expr.cpp
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 16bed3f56d..ba1a30c22e 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -29,6 +29,7 @@
#include "util/to_string.h"
#include "vec/core/block.h"
#include "vec/exec/volap_scanner.h"
+#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@@ -77,6 +78,7 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
/// TODO: could one filter used in the different scan_node ?
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.resize(filter_size);
+ _runtime_filter_ready_flag.resize(filter_size);
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
@@ -86,6 +88,8 @@ Status VOlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
&runtime_filter));
_runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+ _runtime_filter_ready_flag[i] = false;
+ _rf_locks.push_back(std::make_unique<std::mutex>());
}
return Status::OK();
@@ -389,29 +393,73 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
scanner->set_opened();
}
- std::vector<ExprContext*> contexts;
+ std::vector<VExpr*> vexprs;
auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+ /// When runtime filters are ready during running, we should use them to filter data
+ /// in VOlapScanner.
+ /// New arrival rf will be processed as below:
+ /// 1. convert these runtime filters to vectorized expressions
+ /// 2. if this is the first scanner thread to receive this rf, construct a new
+ /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
+ /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
+ /// once after any runtime_filters are ready.
+ /// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
IRuntimeFilter* runtime_filter = nullptr;
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
&runtime_filter);
DCHECK(runtime_filter != nullptr);
bool ready = runtime_filter->is_ready();
if (ready) {
- runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+ runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
scanner_filter_apply_marks[i] = true;
+ if (!_runtime_filter_ready_flag[i]) {
+ std::unique_lock<std::mutex> l(*(_rf_locks[i]));
+ if (!_runtime_filter_ready_flag[i]) {
+ // Use all conjuncts and new arrival runtime filters to construct a new
+ // expression tree here.
+ auto last_expr =
+ _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
+ for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+ texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+ VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+ new_node->add_child(last_expr);
+ new_node->add_child(vexprs[j]);
+ last_expr = new_node;
+ }
+ auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+ auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(),
+ expr_mem_tracker());
+ // If error occurs in `prepare` or `open` phase, discard these runtime
+ // filters directly.
+ if (UNLIKELY(!expr_status.OK())) {
+ LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
+ vexprs.clear();
+ break;
+ }
+ expr_status = new_vconjunct_ctx_ptr->open(state);
+ if (UNLIKELY(!expr_status.OK())) {
+ LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
+ vexprs.clear();
+ break;
+ }
+ _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+ *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+ _runtime_filter_ready_flag[i] = true;
+ }
+ }
}
}
}
- if (!contexts.empty()) {
- std::vector<ExprContext*> new_contexts;
- auto& scanner_conjunct_ctxs = *scanner->conjunct_ctxs();
- Expr::clone_if_not_exists(contexts, state, &new_contexts);
- scanner_conjunct_ctxs.insert(scanner_conjunct_ctxs.end(), new_contexts.begin(),
- new_contexts.end());
+ if (!vexprs.empty()) {
+ WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()),
+ "Something wrong for runtime filters: ");
scanner->set_use_pushdown_conjuncts(true);
}
diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h
index 196de61bb0..f6ce2c5f25 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -228,6 +228,8 @@ private:
};
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+ std::vector<bool> _runtime_filter_ready_flag;
+ std::vector<std::unique_ptr<std::mutex>> _rf_locks;
std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
std::unique_ptr<RuntimeProfile> _scanner_profile;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp
new file mode 100644
index 0000000000..6f73b2c69d
--- /dev/null
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vbloom_predicate.h"
+
+#include <string_view>
+
+namespace doris::vectorized {
+
+VBloomPredicate::VBloomPredicate(const TExprNode& node)
+ : VExpr(node), _filter(nullptr), _expr_name("bloom_predicate") {}
+
+Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
+ VExprContext* context) {
+ RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+
+ if (_prepared) {
+ return Status::OK();
+ }
+ if (_children.size() != 1) {
+ return Status::InternalError("Invalid argument for VBloomPredicate.");
+ }
+
+ _prepared = true;
+
+ ColumnsWithTypeAndName argument_template;
+ argument_template.reserve(_children.size());
+ for (auto child : _children) {
+ auto column = child->data_type()->create_column();
+ argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name());
+ }
+ return Status::OK();
+}
+
+Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ return Status::OK();
+}
+
+void VBloomPredicate::close(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ VExpr::close(state, context, scope);
+}
+
+Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) {
+ doris::vectorized::ColumnNumbers arguments(_children.size());
+ for (int i = 0; i < _children.size(); ++i) {
+ int column_id = -1;
+ _children[i]->execute(context, block, &column_id);
+ arguments[i] = column_id;
+ }
+ // call function
+ size_t num_columns_without_result = block->columns();
+ auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+
+ ColumnPtr argument_column =
+ block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+ size_t sz = argument_column->size();
+ res_data_column->resize(sz);
+ auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+ for (size_t i = 0; i < sz; i++) {
+ ptr[i] = _filter->find(reinterpret_cast<const void*>(argument_column->get_data_at(i).data));
+ }
+ if (_data_type->is_nullable()) {
+ auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+ block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+ _data_type, _expr_name});
+ } else {
+ block->insert({std::move(res_data_column), _data_type, _expr_name});
+ }
+ *result_column_id = num_columns_without_result;
+ return Status::OK();
+}
+
+const std::string& VBloomPredicate::expr_name() const {
+ return _expr_name;
+}
+void VBloomPredicate::set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter) {
+ _filter.reset(filter.release());
+}
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h
new file mode 100644
index 0000000000..f715b9e40e
--- /dev/null
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bloomfilter_predicate.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VBloomPredicate final : public VExpr {
+public:
+ VBloomPredicate(const TExprNode& node);
+ ~VBloomPredicate() = default;
+ doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+ int* result_column_id) override;
+ doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+ VExprContext* context) override;
+ doris::Status open(doris::RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ void close(doris::RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ VExpr* clone(doris::ObjectPool* pool) const override {
+ return pool->add(new VBloomPredicate(*this));
+ }
+ const std::string& expr_name() const override;
+ void set_filter(std::unique_ptr<IBloomFilterFuncBase>& filter);
+
+private:
+ std::shared_ptr<IBloomFilterFuncBase> _filter;
+ std::string _expr_name;
+
+ bool _prepared;
+};
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 29c8903950..058895a6fb 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -33,6 +33,7 @@
#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vinfo_func.h"
#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/exprs/vtuple_is_null_predicate.h"
@@ -57,6 +58,15 @@ VExpr::VExpr(const doris::TExprNode& node)
_data_type = DataTypeFactory::instance().create_data_type(_type, is_nullable);
}
+VExpr::VExpr(const VExpr& vexpr)
+ : _node_type(vexpr._node_type),
+ _type(vexpr._type),
+ _data_type(vexpr._data_type),
+ _children(vexpr._children),
+ _fn(vexpr._fn),
+ _fn_context_index(vexpr._fn_context_index),
+ _constant_col(vexpr._constant_col) {}
+
VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
: _type(type), _fn_context_index(-1) {
if (is_slotref) {
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 7ebe597a0f..7d4fd5c9f0 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -43,6 +43,7 @@ public:
}
VExpr(const TExprNode& node);
+ VExpr(const VExpr& vexpr);
VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable);
// only used for test
VExpr() = default;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
new file mode 100644
index 0000000000..2bd353e934
--- /dev/null
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+ : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+ : VExpr(vexpr),
+ _impl(vexpr._impl),
+ _always_true(vexpr._always_true),
+ _filtered_rows(vexpr._filtered_rows.load()),
+ _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+ VExprContext* context) {
+ RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+ _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+ return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+ return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+ if (_always_true) {
+ auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
+ size_t num_columns_without_result = block->columns();
+ if (_data_type->is_nullable()) {
+ auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+ block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+ _data_type, expr_name()});
+ } else {
+ block->insert({std::move(res_data_column), _data_type, expr_name()});
+ }
+ *result_column_id = num_columns_without_result;
+ return Status::OK();
+ } else {
+ _scan_rows += block->rows();
+ RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));
+ uint8_t* data = nullptr;
+ const ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id);
+ if (auto* nullable = check_and_get_column<ColumnNullable>(*result_column.column)) {
+ data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())
+ ->get_data()
+ .data();
+ _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
+ nullable->get_null_map_data().data(),
+ block->rows());
+ } else if (auto* res_col =
+ check_and_get_column<ColumnVector<UInt8>>(*result_column.column)) {
+ data = const_cast<uint8_t*>(res_col->get_data().data());
+ _filtered_rows += doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data),
+ block->rows());
+ } else {
+ return Status::InternalError("Invalid type for runtime filters!");
+ }
+
+ if ((!_has_calculate_filter) && (_scan_rows.load() >= THRESHOLD_TO_CALCULATE_RATE)) {
+ double rate = (double)_filtered_rows / _scan_rows;
+ if (rate < EXPECTED_FILTER_RATE) {
+ _always_true = true;
+ }
+ _has_calculate_filter = true;
+ }
+ return Status::OK();
+ }
+}
+
+const std::string& VRuntimeFilterWrapper::expr_name() const {
+ return _expr_name;
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h
new file mode 100644
index 0000000000..755d243fd8
--- /dev/null
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+ VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+ VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+ ~VRuntimeFilterWrapper() = default;
+ doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+ int* result_column_id) override;
+ doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+ VExprContext* context) override;
+ doris::Status open(doris::RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ std::string debug_string() const override { return _impl->debug_string(); };
+ bool is_constant() const override;
+ void close(doris::RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ VExpr* clone(doris::ObjectPool* pool) const override {
+ return pool->add(new VRuntimeFilterWrapper(*this));
+ }
+ const std::string& expr_name() const override;
+
+ ColumnPtrWrapper* get_const_col(VExprContext* context) override {
+ return _impl->get_const_col(context);
+ }
+
+private:
+ VExpr* _impl;
+
+ bool _always_true;
+ /// TODO: statistic filter rate in the profile
+ std::atomic<int64_t> _filtered_rows;
+ std::atomic<int64_t> _scan_rows;
+
+ bool _has_calculate_filter = false;
+ // loop size must be power of 2
+ constexpr static int64_t THRESHOLD_TO_CALCULATE_RATE = 8192;
+ // if filter rate less than this, bloom filter will set always true
+ constexpr static double EXPECTED_FILTER_RATE = 0.2;
+
+ std::string _expr_name;
+};
+} // namespace doris::vectorized
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org