You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2022/10/09 06:11:10 UTC
[doris] branch master updated: [Enhancement](runtime filter) optimize for runtime filter (#12856)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 245490d6b7 [Enhancement](runtime filter) optimize for runtime filter (#12856)
245490d6b7 is described below
commit 245490d6b778db6a0eb2e14f62132464fbd96969
Author: Pxl <px...@qq.com>
AuthorDate: Sun Oct 9 14:11:03 2022 +0800
[Enhancement](runtime filter) optimize for runtime filter (#12856)
optimize for runtime filter
---
be/src/exec/olap_scan_node.h | 2 +-
be/src/exec/olap_scanner.cpp | 4 +-
be/src/exec/olap_scanner.h | 4 +-
be/src/exprs/bloomfilter_predicate.cpp | 4 +-
be/src/exprs/bloomfilter_predicate.h | 312 ++++++++++++---------
be/src/exprs/create_predicate_function.h | 8 +-
be/src/exprs/hybrid_set.h | 18 ++
be/src/exprs/minmax_predicate.h | 16 ++
be/src/exprs/runtime_filter.cpp | 55 +++-
be/src/exprs/runtime_filter.h | 11 +-
be/src/exprs/runtime_filter_slots.h | 31 +-
be/src/olap/bloom_filter_predicate.cpp | 4 +-
be/src/olap/bloom_filter_predicate.h | 18 +-
be/src/olap/reader.cpp | 2 +-
be/src/olap/reader.h | 4 +-
be/src/runtime/primitive_type.cpp | 10 +
be/src/runtime/primitive_type.h | 2 +
be/src/runtime/row_batch.cpp | 3 +-
be/src/util/hash_util.hpp | 8 +
be/src/vec/columns/column.h | 1 +
be/src/vec/data_types/data_type.h | 1 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +-
be/src/vec/exec/scan/new_olap_scanner.h | 4 +-
be/src/vec/exec/scan/vscan_node.h | 2 +-
be/src/vec/exec/volap_scan_node.h | 2 +-
be/src/vec/exec/volap_scanner.cpp | 4 +-
be/src/vec/exec/volap_scanner.h | 4 +-
be/src/vec/exprs/vbloom_predicate.cpp | 18 +-
be/src/vec/exprs/vbloom_predicate.h | 6 +-
be/src/vec/exprs/vexpr.h | 2 +-
be/test/exprs/bloom_filter_predicate_test.cpp | 6 +-
.../olap/bloom_filter_column_predicate_test.cpp | 33 ++-
32 files changed, 401 insertions(+), 202 deletions(-)
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 4097c710fe..f5d6e38cfd 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -188,7 +188,7 @@ protected:
// push down bloom filters to storage engine.
// 1. std::pair.first :: column name
// 2. std::pair.second :: shared_ptr of BloomFilterFuncBase
- std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
+ std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
_bloom_filters_push_down;
// push down functions to storage engine
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 75a45b9a46..036025f075 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -59,7 +59,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
Status OlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
@@ -165,7 +165,7 @@ Status OlapScanner::open() {
// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 1bb890dfd3..134a515219 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -48,7 +48,7 @@ public:
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
@@ -95,7 +95,7 @@ public:
protected:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp
index fcf8589ac6..a2ba47e7fc 100644
--- a/be/src/exprs/bloomfilter_predicate.cpp
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -44,13 +44,13 @@ BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other)
_scan_rows() {}
Status BloomFilterPredicate::prepare(RuntimeState* state,
- std::shared_ptr<IBloomFilterFuncBase> filter) {
+ std::shared_ptr<BloomFilterFuncBase> filter) {
// DCHECK(filter != nullptr);
if (_is_prepare) {
return Status::OK();
}
_filter = filter;
- if (nullptr == _filter.get()) {
+ if (nullptr == _filter) {
return Status::InternalError("Unknown column type.");
}
_is_prepare = true;
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 0028e31c8c..2540dc657d 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -19,8 +19,10 @@
#include <algorithm>
#include <cmath>
+#include <cstdint>
#include <memory>
#include <string>
+#include <type_traits>
#include "common/object_pool.h"
#include "exprs/block_bloom_filter.hpp"
@@ -28,21 +30,19 @@
#include "olap/decimal12.h"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/uint24.h"
+#include "util/hash_util.hpp"
namespace doris {
-namespace detail {
-class BlockBloomFilterAdaptor {
+class BloomFilterAdaptor {
public:
- BlockBloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); }
+ BloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); }
static int64_t optimal_bit_num(int64_t expect_num, double fpp) {
return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8;
}
- static BlockBloomFilterAdaptor* create() { return new BlockBloomFilterAdaptor(); }
+ static BloomFilterAdaptor* create() { return new BloomFilterAdaptor(); }
- Status merge(BlockBloomFilterAdaptor* other) {
- return _bloom_filter->merge(*other->_bloom_filter);
- }
+ Status merge(BloomFilterAdaptor* other) { return _bloom_filter->merge(*other->_bloom_filter); }
Status init(int len) {
int log_space = log2(len);
@@ -65,44 +65,42 @@ public:
void add_bytes(const char* data, size_t len) { _bloom_filter->insert(Slice(data, len)); }
+ // test_element/find_element only used on vectorized engine
+ template <typename T>
+ bool test_element(T element) const {
+ if constexpr (std::is_same_v<T, Slice>) {
+ return _bloom_filter->find(element);
+ } else {
+ return _bloom_filter->find(HashUtil::fixed_len_to_uint32(element));
+ }
+ }
+
+ template <typename T>
+ void add_element(T element) {
+ if constexpr (std::is_same_v<T, Slice>) {
+ _bloom_filter->insert(element);
+ } else {
+ _bloom_filter->insert(HashUtil::fixed_len_to_uint32(element));
+ }
+ }
+
private:
std::shared_ptr<doris::BlockBloomFilter> _bloom_filter;
};
-} // namespace detail
-using CurrentBloomFilterAdaptor = detail::BlockBloomFilterAdaptor;
// Only Used In RuntimeFilter
-class IBloomFilterFuncBase {
-public:
- virtual ~IBloomFilterFuncBase() {}
- virtual Status init(int64_t expect_num, double fpp) = 0;
- virtual Status init_with_fixed_length(int64_t bloom_filter_length) = 0;
-
- virtual void insert(const void* data) = 0;
- virtual bool find(const void* data) const = 0;
- virtual bool find_olap_engine(const void* data) const = 0;
- virtual bool find_uint32_t(uint32_t data) const = 0;
-
- virtual Status merge(IBloomFilterFuncBase* bloomfilter_func) = 0;
- virtual Status assign(const char* data, int len) = 0;
-
- virtual Status get_data(char** data, int* len) = 0;
- virtual void light_copy(IBloomFilterFuncBase* other) = 0;
-};
-
-template <class BloomFilterAdaptor>
-class BloomFilterFuncBase : public IBloomFilterFuncBase {
+class BloomFilterFuncBase {
public:
BloomFilterFuncBase() : _inited(false) {}
- virtual ~BloomFilterFuncBase() {}
+ virtual ~BloomFilterFuncBase() = default;
- Status init(int64_t expect_num, double fpp) override {
+ Status init(int64_t expect_num, double fpp) {
size_t filter_size = BloomFilterAdaptor::optimal_bit_num(expect_num, fpp);
return init_with_fixed_length(filter_size);
}
- Status init_with_fixed_length(int64_t bloom_filter_length) override {
+ Status init_with_fixed_length(int64_t bloom_filter_length) {
DCHECK(!_inited);
DCHECK(bloom_filter_length >= 0);
DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0);
@@ -113,7 +111,7 @@ public:
return Status::OK();
}
- Status merge(IBloomFilterFuncBase* bloomfilter_func) override {
+ Status merge(BloomFilterFuncBase* bloomfilter_func) {
auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
if (bloomfilter_func == nullptr) {
_bloom_filter.reset(BloomFilterAdaptor::create());
@@ -125,7 +123,7 @@ public:
return _bloom_filter->merge(other_func->_bloom_filter.get());
}
- Status assign(const char* data, int len) override {
+ Status assign(const char* data, int len) {
if (_bloom_filter == nullptr) {
_bloom_filter.reset(BloomFilterAdaptor::create());
}
@@ -134,19 +132,35 @@ public:
return _bloom_filter->init(data, len);
}
- Status get_data(char** data, int* len) override {
+ Status get_data(char** data, int* len) {
*data = _bloom_filter->data();
*len = _bloom_filter->size();
return Status::OK();
}
- void light_copy(IBloomFilterFuncBase* bloomfilter_func) override {
+ void light_copy(BloomFilterFuncBase* bloomfilter_func) {
auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_bloom_filter = other_func->_bloom_filter;
_inited = other_func->_inited;
}
+ virtual void insert(const void* data) = 0;
+
+ virtual bool find(const void* data) const = 0;
+
+ virtual bool find_olap_engine(const void* data) const = 0;
+
+ virtual bool find_uint32_t(uint32_t data) const = 0;
+
+ virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0;
+
+ virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap,
+ uint16_t* offsets, int number) = 0;
+
+ virtual void find_fixed_len(const char* data, const uint8* nullmap, int number,
+ uint8* results) = 0;
+
protected:
// bloom filter size
int32_t _bloom_filter_alloced;
@@ -154,63 +168,113 @@ protected:
bool _inited;
};
-template <class T, class BloomFilterAdaptor>
+template <class T>
struct CommonFindOp {
- ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
+ // test_batch/find_batch/find_batch_olap_engine only used on vectorized engine
+ void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets,
+ int number) const {
+ for (int i = 0; i < number; i++) {
+ bloom_filter.add_element(*((T*)data + offsets[i]));
+ }
+ }
+
+ uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
+ const uint8* nullmap, uint16_t* offsets, int number) const {
+ uint16_t new_size = 0;
+ for (int i = 0; i < number; i++) {
+ uint16_t idx = offsets[i];
+ if (nullmap != nullptr && nullmap[idx]) {
+ continue;
+ }
+ if (!bloom_filter.test_element(*((T*)data + idx))) {
+ continue;
+ }
+ offsets[new_size++] = idx;
+ }
+ return new_size;
+ }
+
+ void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap,
+ int number, uint8* results) const {
+ for (int i = 0; i < number; i++) {
+ results[i] = false;
+ if (nullmap != nullptr && nullmap[i]) {
+ continue;
+ }
+ if (!bloom_filter.test_element(*((T*)data + i))) {
+ continue;
+ }
+ results[i] = true;
+ }
+ }
+
+ void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
bloom_filter.add_bytes((char*)data, sizeof(T));
}
- ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+ bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
return bloom_filter.test(Slice((char*)data, sizeof(T)));
}
- ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
- const void* data) const {
- return this->find(bloom_filter, data);
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+ return find(bloom_filter, data);
}
- ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+ bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
return bloom_filter.test(data);
}
};
-template <class BloomFilterAdaptor>
struct StringFindOp {
- ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
+ void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, const int* offsets,
+ int number) const {
+ LOG(FATAL) << "StringFindOp does not support insert_batch";
+ }
+
+ uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
+ const uint8* nullmap, uint16_t* offsets, int number) const {
+ LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine";
+ return 0;
+ }
+
+ void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, const uint8* nullmap,
+ int number, uint8* results) const {
+ LOG(FATAL) << "StringFindOp does not support find_batch";
+ }
+
+ void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
const auto* value = reinterpret_cast<const StringValue*>(data);
if (value) {
bloom_filter.add_bytes(value->ptr, value->len);
}
}
- ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+ bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
const auto* value = reinterpret_cast<const StringValue*>(data);
if (value == nullptr) {
return false;
}
return bloom_filter.test(Slice(value->ptr, value->len));
}
- ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
- const void* data) const {
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
return StringFindOp::find(bloom_filter, data);
}
- ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+ bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
return bloom_filter.test(data);
}
};
// We do not need to judge whether data is empty, because null will not appear
// when filer used by the storage engine
-template <class BloomFilterAdaptor>
-struct FixedStringFindOp : public StringFindOp<BloomFilterAdaptor> {
- ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
- const void* input_data) const {
+struct FixedStringFindOp : public StringFindOp {
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* input_data) const {
const auto* value = reinterpret_cast<const StringValue*>(input_data);
int64_t size = value->len;
char* data = value->ptr;
- while (size > 0 && data[size - 1] == '\0') size--;
+ while (size > 0 && data[size - 1] == '\0') {
+ size--;
+ }
return bloom_filter.test(Slice(value->ptr, size));
}
};
-template <class BloomFilterAdaptor>
-struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
+struct DateTimeFindOp : public CommonFindOp<DateTimeValue> {
bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
DateTimeValue value;
value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
@@ -221,8 +285,7 @@ struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
// avoid violating C/C++ aliasing rules.
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684
-template <class BloomFilterAdaptor>
-struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
+struct DateFindOp : public CommonFindOp<DateTimeValue> {
bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
uint24_t date = *static_cast<const uint24_t*>(data);
uint64_t value = uint32_t(date);
@@ -237,31 +300,7 @@ struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
}
};
-template <class BloomFilterAdaptor>
-struct DateV2FindOp
- : public CommonFindOp<doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>,
- BloomFilterAdaptor> {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
- return bloom_filter.test(
- Slice((char*)data,
- sizeof(doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>)));
- }
-};
-
-template <class BloomFilterAdaptor>
-struct DateTimeV2FindOp
- : public CommonFindOp<
- doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>,
- BloomFilterAdaptor> {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
- return bloom_filter.test(Slice(
- (char*)data,
- sizeof(doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>)));
- }
-};
-
-template <class BloomFilterAdaptor>
-struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value, BloomFilterAdaptor> {
+struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value> {
bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
auto packed_decimal = *static_cast<const decimal12_t*>(data);
DecimalV2Value value;
@@ -276,103 +315,106 @@ struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value, BloomFilterAdaptor>
}
};
-template <PrimitiveType type, class BloomFilterAdaptor>
+template <PrimitiveType type>
struct BloomFilterTypeTraits {
using T = typename PrimitiveTypeTraits<type>::CppType;
- using FindOp = CommonFindOp<T, BloomFilterAdaptor>;
-};
-
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_DATE, BloomFilterAdaptor> {
- using FindOp = DateFindOp<BloomFilterAdaptor>;
-};
-
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_DATEV2, BloomFilterAdaptor> {
- using FindOp = DateV2FindOp<BloomFilterAdaptor>;
+ using FindOp = CommonFindOp<T>;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_DATETIMEV2, BloomFilterAdaptor> {
- using FindOp = DateTimeV2FindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_DATE> {
+ using FindOp = DateFindOp;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_DATETIME, BloomFilterAdaptor> {
- using FindOp = DateTimeFindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_DATETIME> {
+ using FindOp = DateTimeFindOp;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_DECIMALV2, BloomFilterAdaptor> {
- using FindOp = DecimalV2FindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_DECIMALV2> {
+ using FindOp = DecimalV2FindOp;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_CHAR, BloomFilterAdaptor> {
- using FindOp = FixedStringFindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_CHAR> {
+ using FindOp = FixedStringFindOp;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_VARCHAR, BloomFilterAdaptor> {
- using FindOp = StringFindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_VARCHAR> {
+ using FindOp = StringFindOp;
};
-template <class BloomFilterAdaptor>
-struct BloomFilterTypeTraits<TYPE_STRING, BloomFilterAdaptor> {
- using FindOp = StringFindOp<BloomFilterAdaptor>;
+template <>
+struct BloomFilterTypeTraits<TYPE_STRING> {
+ using FindOp = StringFindOp;
};
-template <PrimitiveType type, class BloomFilterAdaptor>
-class BloomFilterFunc final : public BloomFilterFuncBase<BloomFilterAdaptor> {
+template <PrimitiveType type>
+class BloomFilterFunc final : public BloomFilterFuncBase {
public:
- BloomFilterFunc() : BloomFilterFuncBase<BloomFilterAdaptor>() {}
+ BloomFilterFunc() : BloomFilterFuncBase() {}
- ~BloomFilterFunc() = default;
+ ~BloomFilterFunc() override = default;
void insert(const void* data) override {
- DCHECK(this->_bloom_filter != nullptr);
- dummy.insert(*this->_bloom_filter, data);
+ DCHECK(_bloom_filter != nullptr);
+ dummy.insert(*_bloom_filter, data);
+ }
+
+ void insert_fixed_len(const char* data, const int* offsets, int number) override {
+ DCHECK(_bloom_filter != nullptr);
+ dummy.insert_batch(*_bloom_filter, data, offsets, number);
+ }
+
+ uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets,
+ int number) override {
+ return dummy.find_batch_olap_engine(*_bloom_filter, data, nullmap, offsets, number);
+ }
+
+ void find_fixed_len(const char* data, const uint8* nullmap, int number,
+ uint8* results) override {
+ dummy.find_batch(*_bloom_filter, data, nullmap, number, results);
}
bool find(const void* data) const override {
- DCHECK(this->_bloom_filter != nullptr);
- return dummy.find(*this->_bloom_filter, data);
+ DCHECK(_bloom_filter != nullptr);
+ return dummy.find(*_bloom_filter, data);
}
bool find_olap_engine(const void* data) const override {
- return dummy.find_olap_engine(*this->_bloom_filter, data);
+ return dummy.find_olap_engine(*_bloom_filter, data);
}
- bool find_uint32_t(uint32_t data) const override {
- return dummy.find(*this->_bloom_filter, data);
- }
+ bool find_uint32_t(uint32_t data) const override { return dummy.find(*_bloom_filter, data); }
private:
- typename BloomFilterTypeTraits<type, BloomFilterAdaptor>::FindOp dummy;
+ typename BloomFilterTypeTraits<type>::FindOp dummy;
};
// BloomFilterPredicate only used in runtime filter
class BloomFilterPredicate : public Predicate {
public:
- virtual ~BloomFilterPredicate();
+ ~BloomFilterPredicate() override;
BloomFilterPredicate(const TExprNode& node);
BloomFilterPredicate(const BloomFilterPredicate& other);
- virtual Expr* clone(ObjectPool* pool) const override {
+ Expr* clone(ObjectPool* pool) const override {
return pool->add(new BloomFilterPredicate(*this));
}
using Predicate::prepare;
- Status prepare(RuntimeState* state, std::shared_ptr<IBloomFilterFuncBase> bloomfilterfunc);
+ Status prepare(RuntimeState* state, std::shared_ptr<BloomFilterFuncBase> bloomfilterfunc);
- std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
+ std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
- virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override;
+ BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override;
- virtual Status open(RuntimeState* state, ExprContext* context,
- FunctionContext::FunctionStateScope scope) override;
+ Status open(RuntimeState* state, ExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
protected:
friend class Expr;
- virtual std::string debug_string() const override;
+ std::string debug_string() const override;
private:
bool _is_prepare;
@@ -382,7 +424,7 @@ private:
std::atomic<int64_t> _filtered_rows;
std::atomic<int64_t> _scan_rows;
- std::shared_ptr<IBloomFilterFuncBase> _filter;
+ std::shared_ptr<BloomFilterFuncBase> _filter;
bool _has_calculate_filter = false;
// loop size must be power of 2
constexpr static int64_t _loop_size = 8192;
diff --git a/be/src/exprs/create_predicate_function.h b/be/src/exprs/create_predicate_function.h
index 5aa0e2347f..bc1ca659ea 100644
--- a/be/src/exprs/create_predicate_function.h
+++ b/be/src/exprs/create_predicate_function.h
@@ -28,7 +28,7 @@ public:
using BasePtr = MinMaxFuncBase*;
template <PrimitiveType type>
static BasePtr get_function() {
- return new (std::nothrow) MinMaxNumFunc<typename PrimitiveTypeTraits<type>::CppType>();
+ return new MinMaxNumFunc<typename PrimitiveTypeTraits<type>::CppType>();
};
};
@@ -41,16 +41,16 @@ public:
using CppType = typename PrimitiveTypeTraits<type>::CppType;
using Set = std::conditional_t<std::is_same_v<CppType, StringValue>, StringSet,
HybridSet<type, is_vec>>;
- return new (std::nothrow) Set();
+ return new Set();
};
};
class BloomFilterTraits {
public:
- using BasePtr = IBloomFilterFuncBase*;
+ using BasePtr = BloomFilterFuncBase*;
template <PrimitiveType type>
static BasePtr get_function() {
- return new BloomFilterFunc<type, CurrentBloomFilterAdaptor>();
+ return new BloomFilterFunc<type>();
};
};
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 20c9721675..571aa82b87 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -41,6 +41,8 @@ public:
// use in vectorize execute engine
virtual void insert(void* data, size_t) = 0;
+ virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0;
+
virtual void insert(HybridSetBase* set) = 0;
virtual int size() = 0;
@@ -106,6 +108,12 @@ public:
}
void insert(void* data, size_t) override { insert(data); }
+ void insert_fixed_len(const char* data, const int* offsets, int number) override {
+ for (int i = 0; i < number; i++) {
+ _set.insert(*((CppType*)data + offsets[i]));
+ }
+ }
+
void insert(HybridSetBase* set) override {
HybridSet<T, is_vec>* hybrid_set = reinterpret_cast<HybridSet<T, is_vec>*>(set);
_set.insert(hybrid_set->_set.begin(), hybrid_set->_set.end());
@@ -173,11 +181,16 @@ public:
std::string str_value(value->ptr, value->len);
_set.insert(str_value);
}
+
void insert(void* data, size_t size) override {
std::string str_value(reinterpret_cast<char*>(data), size);
_set.insert(str_value);
}
+ void insert_fixed_len(const char* data, const int* offsets, int number) override {
+ LOG(FATAL) << "string set not support insert_fixed_len";
+ }
+
void insert(HybridSetBase* set) override {
StringSet* string_set = reinterpret_cast<StringSet*>(set);
_set.insert(string_set->_set.begin(), string_set->_set.end());
@@ -259,11 +272,16 @@ public:
StringValue sv(value->ptr, value->len);
_set.insert(sv);
}
+
void insert(void* data, size_t size) override {
StringValue sv(reinterpret_cast<char*>(data), size);
_set.insert(sv);
}
+ void insert_fixed_len(const char* data, const int* offsets, int number) override {
+ LOG(FATAL) << "string set not support insert_fixed_len";
+ }
+
void insert(HybridSetBase* set) override {
StringValueSet* string_set = reinterpret_cast<StringValueSet*>(set);
_set.insert(string_set->_set.begin(), string_set->_set.end());
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index ef811c5c9e..a237f7f8b2 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -25,6 +25,7 @@ namespace doris {
class MinMaxFuncBase {
public:
virtual void insert(const void* data) = 0;
+ virtual void insert_fixed_len(const char* data, const int* offsets, int number) = 0;
virtual bool find(void* data) = 0;
virtual bool is_empty() = 0;
virtual void* get_max() = 0;
@@ -68,6 +69,21 @@ public:
}
}
+ void insert_fixed_len(const char* data, const int* offsets, int number) override {
+ if (!number) {
+ return;
+ }
+ if (_empty) {
+ _min = *((T*)data + offsets[0]);
+ _max = *((T*)data + offsets[0]);
+ }
+ for (int i = _empty; i < number; i++) {
+ _min = std::min(_min, *((T*)data + offsets[i]));
+ _max = std::max(_max, *((T*)data + offsets[i]));
+ }
+ _empty = false;
+ }
+
bool find(void* data) override {
if (data == nullptr) {
return false;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 89e832f3d0..1311f2b928 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -31,12 +31,14 @@
#include "exprs/literal.h"
#include "exprs/minmax_predicate.h"
#include "gen_cpp/internal_service.pb.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/string_parser.hpp"
+#include "vec/columns/column.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
@@ -489,8 +491,41 @@ public:
break;
}
}
+
+ void insert_fixed_len(const char* data, const int* offsets, int number) {
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ if (_is_ignored_in_filter) {
+ break;
+ }
+ _hybrid_set->insert_fixed_len(data, offsets, number);
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _minmax_func->insert_fixed_len(data, offsets, number);
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ _bloomfilter_func->insert_fixed_len(data, offsets, number);
+ break;
+ }
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ if (_is_bloomfilter) {
+ _bloomfilter_func->insert_fixed_len(data, offsets, number);
+ } else {
+ _hybrid_set->insert_fixed_len(data, offsets, number);
+ }
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+ }
+
void insert(const StringRef& value) {
switch (_column_return_type) {
+ // todo: rethink logic of hll/bitmap/date
case TYPE_DATE:
case TYPE_DATETIME: {
// DateTime->DateTimeValue
@@ -521,6 +556,16 @@ public:
}
}
+ void insert_batch(const vectorized::ColumnPtr column, const std::vector<int>& rows) {
+ if (IRuntimeFilter::enable_use_batch(_column_return_type)) {
+ insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size());
+ } else {
+ for (int index : rows) {
+ insert(column->get_data_at(index));
+ }
+ }
+ }
+
RuntimeFilterType get_real_type() {
auto real_filter_type = _filter_type;
if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
@@ -1000,7 +1045,7 @@ private:
int32_t _max_in_num = -1;
std::unique_ptr<MinMaxFuncBase> _minmax_func;
std::unique_ptr<HybridSetBase> _hybrid_set;
- std::shared_ptr<IBloomFilterFuncBase> _bloomfilter_func;
+ std::shared_ptr<BloomFilterFuncBase> _bloomfilter_func;
bool _is_bloomfilter = false;
bool _is_ignored_in_filter = false;
std::string* _ignored_in_filter_msg = nullptr;
@@ -1029,6 +1074,12 @@ void IRuntimeFilter::insert(const StringRef& value) {
_wrapper->insert(value);
}
+void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column,
+ const std::vector<int>& rows) {
+ DCHECK(is_producer());
+ _wrapper->insert_batch(column, rows);
+}
+
Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
@@ -1255,7 +1306,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
}
void IRuntimeFilter::update_runtime_filter_type_to_profile() {
- if (_profile.get() != nullptr) {
+ if (_profile != nullptr) {
_profile->add_info_string("RealRuntimeFilterType",
::doris::to_string(_wrapper->get_real_type()));
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 587c0aa87a..60162b98f0 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -139,6 +139,7 @@ public:
// only used for producer
void insert(const void* data);
void insert(const StringRef& data);
+ void insert_batch(vectorized::ColumnPtr column, const std::vector<int>& rows);
// publish filter
// push filter to remote node or push down it to scan_node
@@ -175,7 +176,7 @@ public:
bool is_producer() const { return _role == RuntimeFilterRole::PRODUCER; }
bool is_consumer() const { return _role == RuntimeFilterRole::CONSUMER; }
void set_role(const RuntimeFilterRole role) { _role = role; }
- int expr_order() { return _expr_order; }
+ int expr_order() const { return _expr_order; }
// only used for consumer
// if filter is not ready for filter data scan_node
@@ -209,7 +210,7 @@ public:
void set_ignored() { _is_ignored = true; }
// for ut
- bool is_ignored() { return _is_ignored; }
+ bool is_ignored() const { return _is_ignored; }
void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
@@ -231,6 +232,10 @@ public:
void ready_for_publish();
+ static bool enable_use_batch(PrimitiveType type) {
+ return is_int_or_bool(type) || is_float_or_double(type);
+ }
+
protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
@@ -282,7 +287,7 @@ protected:
// Indicate whether runtime filter expr has been ignored
bool _is_ignored;
- std::string _ignored_msg = "";
+ std::string _ignored_msg;
// some runtime filter will generate
// multiple contexts such as minmax filter
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
index 144a31224c..43fca0a71f 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -20,6 +20,9 @@
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/columns_number.h"
+#include "vec/common/assert_cast.h"
namespace doris {
// this class used in a hash join node
@@ -164,10 +167,13 @@ public:
}
}
}
+
void insert(std::unordered_map<const vectorized::Block*, std::vector<int>>& datas) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
- if (iter == _runtime_filters.end()) continue;
+ if (iter == _runtime_filters.end()) {
+ continue;
+ }
int result_column_id = _build_expr_context[i]->get_last_result_column_id();
for (auto it : datas) {
@@ -175,24 +181,23 @@ public:
if (auto* nullable =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& column_nested = nullable->get_nested_column();
- auto& column_nullmap = nullable->get_null_map_column();
+ auto& column_nested = nullable->get_nested_column_ptr();
+ auto& column_nullmap = nullable->get_null_map_column_ptr();
+ std::vector<int> indexs;
for (int row_num : it.second) {
- if (column_nullmap.get_bool(row_num)) {
+ if (assert_cast<const vectorized::ColumnUInt8*>(column_nullmap.get())
+ ->get_bool(row_num)) {
continue;
}
- const auto& ref_data = column_nested.get_data_at(row_num);
- for (auto filter : iter->second) {
- filter->insert(ref_data);
- }
+ indexs.push_back(row_num);
+ }
+ for (auto filter : iter->second) {
+ filter->insert_batch(column_nested, indexs);
}
} else {
- for (int row_num : it.second) {
- const auto& ref_data = column->get_data_at(row_num);
- for (auto filter : iter->second) {
- filter->insert(ref_data);
- }
+ for (auto filter : iter->second) {
+ filter->insert_batch(column, it.second);
}
}
}
diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp
index 299e4e1e1c..8d281cb06a 100644
--- a/be/src/olap/bloom_filter_predicate.cpp
+++ b/be/src/olap/bloom_filter_predicate.cpp
@@ -40,9 +40,9 @@
namespace doris {
ColumnPredicate* BloomFilterColumnPredicateFactory::create_column_predicate(
- uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& bloom_filter,
+ uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& bloom_filter,
FieldType type) {
- std::shared_ptr<IBloomFilterFuncBase> filter;
+ std::shared_ptr<BloomFilterFuncBase> filter;
switch (type) {
#define M(NAME) \
case OLAP_FIELD_##NAME: { \
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index fd121302de..392fa5a438 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -19,9 +19,8 @@
#include <stdint.h>
-#include <roaring/roaring.hh>
-
#include "exprs/bloomfilter_predicate.h"
+#include "exprs/runtime_filter.h"
#include "olap/column_predicate.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_nullable.h"
@@ -34,10 +33,10 @@ namespace doris {
template <PrimitiveType T>
class BloomFilterColumnPredicate : public ColumnPredicate {
public:
- using SpecificFilter = BloomFilterFunc<T, CurrentBloomFilterAdaptor>;
+ using SpecificFilter = BloomFilterFunc<T>;
BloomFilterColumnPredicate(uint32_t column_id,
- const std::shared_ptr<IBloomFilterFuncBase>& filter)
+ const std::shared_ptr<BloomFilterFuncBase>& filter)
: ColumnPredicate(column_id),
_filter(filter),
_specific_filter(static_cast<SpecificFilter*>(_filter.get())) {}
@@ -81,6 +80,12 @@ private:
new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
}
}
+ } else if (IRuntimeFilter::enable_use_batch(T)) {
+ new_size = _specific_filter->find_fixed_len_olap_engine(
+ (char*)reinterpret_cast<const vectorized::PredicateColumnType<T>*>(&column)
+ ->get_data()
+ .data(),
+ null_map, sel, size);
} else {
uint24_t tmp_uint24_value;
auto get_cell_value = [&tmp_uint24_value](auto& data) {
@@ -113,7 +118,7 @@ private:
return new_size;
}
- std::shared_ptr<IBloomFilterFuncBase> _filter;
+ std::shared_ptr<BloomFilterFuncBase> _filter;
SpecificFilter* _specific_filter; // owned by _filter
mutable uint64_t _evaluated_rows = 1;
mutable uint64_t _passed_rows = 0;
@@ -174,8 +179,7 @@ uint16_t BloomFilterColumnPredicate<T>::evaluate(const vectorized::IColumn& colu
class BloomFilterColumnPredicateFactory {
public:
static ColumnPredicate* create_column_predicate(
- uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& filter,
- FieldType type);
+ uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter, FieldType type);
};
} //namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index aaec382cb0..c757906d8d 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -464,7 +464,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
}
ColumnPredicate* TabletReader::_parse_to_predicate(
- const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter) {
+ const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
int32_t index = _tablet_schema->field_index(bloom_filter.first);
if (index < 0) {
return nullptr;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index ae476e4fa2..7c5dc32fcf 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -76,7 +76,7 @@ public:
bool end_key_include = false;
std::vector<TCondition> conditions;
- std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>> bloom_filters;
+ std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters;
std::vector<FunctionFilter> function_filters;
std::vector<RowsetMetaSharedPtr> delete_predicates;
@@ -166,7 +166,7 @@ protected:
void _init_conditions_param(const ReaderParams& read_params);
ColumnPredicate* _parse_to_predicate(
- const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter);
+ const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter);
diff --git a/be/src/runtime/primitive_type.cpp b/be/src/runtime/primitive_type.cpp
index 61ee8bb0ca..9c7a771e59 100644
--- a/be/src/runtime/primitive_type.cpp
+++ b/be/src/runtime/primitive_type.cpp
@@ -21,6 +21,7 @@
#include "gen_cpp/Types_types.h"
#include "runtime/collection_value.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/jsonb_value.h"
#include "runtime/string_value.h"
@@ -132,6 +133,15 @@ bool is_string_type(PrimitiveType type) {
return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING;
}
+bool is_float_or_double(PrimitiveType type) {
+ return type == TYPE_FLOAT || type == TYPE_DOUBLE;
+}
+
+bool is_int_or_bool(PrimitiveType type) {
+ return type == TYPE_BOOLEAN || type == TYPE_TINYINT || type == TYPE_SMALLINT ||
+ type == TYPE_INT || type == TYPE_BIGINT || type == TYPE_LARGEINT;
+}
+
bool has_variable_type(PrimitiveType type) {
return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_OBJECT ||
type == TYPE_QUANTILE_STATE || type == TYPE_STRING;
diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h
index 18d7aa6605..c00ef24805 100644
--- a/be/src/runtime/primitive_type.h
+++ b/be/src/runtime/primitive_type.h
@@ -40,6 +40,8 @@ PrimitiveType convert_type_to_primitive(FunctionContext::Type type);
bool is_enumeration_type(PrimitiveType type);
bool is_date_type(PrimitiveType type);
+bool is_float_or_double(PrimitiveType type);
+bool is_int_or_bool(PrimitiveType type);
bool is_string_type(PrimitiveType type);
bool has_variable_type(PrimitiveType type);
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 79cfe1a946..b910d2c9b8 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -78,8 +78,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
_num_tuples_per_row(input_batch.row_tuples_size()),
_row_desc(row_desc),
_auxiliary_mem_usage(0),
- _need_to_return(false),
- _tuple_data_pool() {
+ _need_to_return(false) {
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
_tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp
index 845ee754a7..330a35d028 100644
--- a/be/src/util/hash_util.hpp
+++ b/be/src/util/hash_util.hpp
@@ -45,6 +45,14 @@ namespace doris {
// Utility class to compute hash values.
class HashUtil {
public:
+ template <typename T>
+ static uint32_t fixed_len_to_uint32(T value) {
+ if constexpr (sizeof(T) <= sizeof(uint32_t)) {
+ return value;
+ }
+ return std::hash<T>()(value);
+ }
+
static uint32_t zlib_crc_hash(const void* data, int32_t bytes, uint32_t hash) {
return crc32(hash, (const unsigned char*)data, bytes);
}
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 0459ef4298..ab6c103e86 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -299,6 +299,7 @@ public:
/// This is for calculating the memory size for vectorized serialization of aggregation keys.
virtual size_t get_max_row_byte_size() const {
LOG(FATAL) << "get_max_row_byte_size not supported";
+ return 0;
}
virtual void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h
index 1e7f8a7af0..23aa6d7d4a 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -279,6 +279,7 @@ struct WhichDataType {
bool is_int() const {
return is_int8() || is_int16() || is_int32() || is_int64() || is_int128();
}
+ bool is_int_or_uint() const { return is_int() || is_uint(); }
bool is_native_int() const { return is_int8() || is_int16() || is_int32() || is_int64(); }
bool is_decimal32() const { return idx == TypeIndex::Decimal32; }
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp
index d6fe854e63..ea07536a76 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -36,7 +36,7 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int
Status NewOlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
@@ -130,7 +130,7 @@ Status NewOlapScanner::open(RuntimeState* state) {
// it will be called under tablet read lock because capture rs readers need
Status NewOlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h
index 7e784d69fc..41b2888879 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -43,7 +43,7 @@ public:
public:
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
@@ -58,7 +58,7 @@ private:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 4f08192cd0..fa51dd393f 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -183,7 +183,7 @@ protected:
// Save all bloom filter predicates which may be pushed down to data source.
// column name -> bloom filter function
- std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
+ std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
_bloom_filters_push_down;
// Save all function predicates which may be pushed down to data source.
diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h
index c8ab58fae7..44b3829402 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -168,7 +168,7 @@ private:
// push down bloom filters to storage engine.
// 1. std::pair.first :: column name
// 2. std::pair.second :: shared_ptr of BloomFilterFuncBase
- std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
+ std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
_bloom_filters_push_down;
// push down functions to storage engine
diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp
index fc082a6313..5761705a84 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -45,7 +45,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b
Status VOlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
@@ -154,7 +154,7 @@ TabletStorageType VOlapScanner::get_storage_type() {
// it will be called under tablet read lock because capture rs readers need
Status VOlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 2e21dc5777..d3cd791b63 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -41,7 +41,7 @@ public:
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
@@ -94,7 +94,7 @@ public:
private:
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp
index dc3be41027..6ba96eb4f0 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -17,6 +17,7 @@
#include "vec/exprs/vbloom_predicate.h"
+#include <cstdint>
#include <string_view>
#include "common/status.h"
@@ -71,13 +72,24 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result
size_t sz = argument_column->size();
res_data_column->resize(sz);
auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
- if (WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type))
- .is_string_or_fixed_string()) {
+ auto type = WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type));
+ if (type.is_string_or_fixed_string()) {
for (size_t i = 0; i < sz; i++) {
auto ele = argument_column->get_data_at(i);
const StringValue v(ele.data, ele.size);
ptr[i] = _filter->find(reinterpret_cast<const void*>(&v));
}
+ } else if (type.is_int_or_uint() || type.is_float()) {
+ if (argument_column->is_nullable()) {
+ auto column_nested = reinterpret_cast<const ColumnNullable*>(argument_column.get())
+ ->get_nested_column_ptr();
+ auto column_nullmap = reinterpret_cast<const ColumnNullable*>(argument_column.get())
+ ->get_null_map_column_ptr();
+ _filter->find_fixed_len(column_nested->get_raw_data().data,
+ (uint8*)column_nullmap->get_raw_data().data, sz, ptr);
+ } else {
+ _filter->find_fixed_len(argument_column->get_raw_data().data, nullptr, sz, ptr);
+ }
} else {
for (size_t i = 0; i < sz; i++) {
ptr[i] = _filter->find(
@@ -98,7 +110,7 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result
const std::string& VBloomPredicate::expr_name() const {
return _expr_name;
}
-void VBloomPredicate::set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter) {
+void VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase>& filter) {
_filter = filter;
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h b/be/src/vec/exprs/vbloom_predicate.h
index b4e9f54a31..ad0f15820e 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -37,12 +37,12 @@ public:
return pool->add(new VBloomPredicate(*this));
}
const std::string& expr_name() const override;
- void set_filter(std::shared_ptr<IBloomFilterFuncBase>& filter);
+ void set_filter(std::shared_ptr<BloomFilterFuncBase>& filter);
- std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const override { return _filter; }
+ std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const override { return _filter; }
private:
- std::shared_ptr<IBloomFilterFuncBase> _filter;
+ std::shared_ptr<BloomFilterFuncBase> _filter;
std::string _expr_name;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index e0e0defac4..0a82877b93 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -159,7 +159,7 @@ public:
virtual const VExpr* get_impl() const { return nullptr; }
// If this expr is a BloomPredicate, this method will return a BloomFilterFunc
- virtual std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() const {
+ virtual std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const {
LOG(FATAL) << "Method 'get_bloom_filter_func()' is not supported in expression: "
<< this->debug_string();
return nullptr;
diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp
index abedc9a59f..3f2fec11bc 100644
--- a/be/test/exprs/bloom_filter_predicate_test.cpp
+++ b/be/test/exprs/bloom_filter_predicate_test.cpp
@@ -31,7 +31,7 @@ public:
};
TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
- std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_INT));
+ std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_INT));
EXPECT_TRUE(func->init(1024, 0.05).ok());
const int data_size = 1024;
int data[data_size];
@@ -51,7 +51,7 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
}
TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
- std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR));
+ std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR));
EXPECT_TRUE(func->init(1024, 0.05).ok());
ObjectPool obj_pool;
const int data_size = 1024;
@@ -100,7 +100,7 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
}
TEST_F(BloomFilterPredicateTest, bloom_filter_size_test) {
- std::unique_ptr<IBloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR));
+ std::unique_ptr<BloomFilterFuncBase> func(create_bloom_filter(PrimitiveType::TYPE_VARCHAR));
int length = 4096;
func->init_with_fixed_length(4096);
char* data = nullptr;
diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp
index 79ba5af940..cc98c90698 100644
--- a/be/test/olap/bloom_filter_column_predicate_test.cpp
+++ b/be/test/olap/bloom_filter_column_predicate_test.cpp
@@ -27,6 +27,7 @@
#include "runtime/mem_pool.h"
#include "runtime/string_value.hpp"
#include "vec/columns/column_nullable.h"
+#include "vec/columns/columns_number.h"
#include "vec/columns/predicate_column.h"
#include "vec/core/block.h"
@@ -78,7 +79,7 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
return_columns.push_back(i);
}
- std::shared_ptr<IBloomFilterFuncBase> bloom_filter(
+ std::shared_ptr<BloomFilterFuncBase> bloom_filter(
create_bloom_filter(PrimitiveType::TYPE_FLOAT));
bloom_filter->init(4096, 0.05);
@@ -90,7 +91,6 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
bloom_filter->insert(reinterpret_cast<void*>(&value));
ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate(
0, bloom_filter, OLAP_FIELD_TYPE_FLOAT);
- auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float)));
// for ColumnBlock no null
init_row_block(tablet_schema, size);
@@ -123,6 +123,31 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
EXPECT_EQ(select_size, 1);
EXPECT_FLOAT_EQ(*(float*)col_block.cell(_row_block->selection_vector()[0]).cell_ptr(), 5.1);
+ delete pred;
+}
+
+TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN_VEC) {
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ SetTabletSchema(std::string("FLOAT_COLUMN"), "FLOAT", "REPLACE", 1, true, true, tablet_schema);
+ const int size = 10;
+ std::vector<uint32_t> return_columns;
+ for (int i = 0; i < tablet_schema->num_columns(); ++i) {
+ return_columns.push_back(i);
+ }
+
+ std::shared_ptr<BloomFilterFuncBase> bloom_filter(
+ create_bloom_filter(PrimitiveType::TYPE_FLOAT));
+
+ bloom_filter->init(4096, 0.05);
+ auto column_data = ColumnFloat32::create();
+ float values[3] = {4.1, 5.1, 6.1};
+ int offsets[3] = {0, 1, 2};
+
+ bloom_filter->insert_fixed_len((char*)values, offsets, 3);
+ ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate(
+ 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT);
+ auto* col_data = reinterpret_cast<float*>(_mem_pool->allocate(size * sizeof(float)));
+
// for vectorized::Block no null
auto pred_col = PredicateColumnType<TYPE_FLOAT>::create();
pred_col->reserve(size);
@@ -130,8 +155,9 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
*(col_data + i) = i + 0.1f;
pred_col->insert_data(reinterpret_cast<const char*>(col_data + i), 0);
}
+ init_row_block(tablet_schema, size);
_row_block->clear();
- select_size = _row_block->selected_size();
+ auto select_size = _row_block->selected_size();
select_size = pred->evaluate(*pred_col, _row_block->selection_vector(), select_size);
EXPECT_EQ(select_size, 3);
EXPECT_FLOAT_EQ((float)pred_col->get_data()[_row_block->selection_vector()[0]], 4.1);
@@ -156,5 +182,4 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
delete pred;
}
-
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org