You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/10 02:12:22 UTC
[incubator-doris] branch master updated: [optimize] Optimize
bloomfilter performance (#6180)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 290a844 [optimize] Optimize bloomfilter performance (#6180)
290a844 is described below
commit 290a844e0410c3b68315cae0571d204f7333e808
Author: stdpain <34...@users.noreply.github.com>
AuthorDate: Sat Jul 10 10:12:12 2021 +0800
[optimize] Optimize bloomfilter performance (#6180)
refactor runtime filter bloomfilter and eliminate some virtual function calls which obtained a performance improvement of about 5%
import block bloom filter, for avx version obtained 40% performance improvement
before: bloomfilter size:default, about 2000W item cost about 1s400ms
after: bloomfilter size:524288, about 2000W item cost about 400ms
---
be/CMakeLists.txt | 3 +
be/src/exec/olap_scan_node.h | 2 +-
be/src/exec/olap_scanner.cpp | 6 +-
be/src/exec/olap_scanner.h | 4 +-
be/src/exprs/CMakeLists.txt | 2 +
be/src/exprs/block_bloom_filter.hpp | 198 +++++++++++++
be/src/exprs/block_bloom_filter_avx_impl.cc | 82 ++++++
be/src/exprs/block_bloom_filter_impl.cc | 233 +++++++++++++++
be/src/exprs/bloomfilter_predicate.cpp | 51 ++--
be/src/exprs/bloomfilter_predicate.h | 323 +++++++++++++--------
be/src/exprs/runtime_filter.cpp | 6 +-
be/src/olap/bloom_filter_predicate.cpp | 80 +++--
be/src/olap/bloom_filter_predicate.h | 62 +++-
be/src/olap/reader.cpp | 24 +-
be/src/olap/reader.h | 4 +-
.../rowset/segment_v2/block_split_bloom_filter.cpp | 38 +--
.../rowset/segment_v2/block_split_bloom_filter.h | 38 ++-
be/src/olap/rowset/segment_v2/bloom_filter.h | 4 +-
be/src/runtime/primitive_type.h | 56 ++++
be/test/exprs/bloom_filter_predicate_test.cpp | 30 +-
.../olap/bloom_filter_column_predicate_test.cpp | 10 +-
build.sh | 5 +
22 files changed, 971 insertions(+), 290 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 261a167..30148ca 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -341,6 +341,9 @@ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBRPC_ENABLE_CPU_PROFILER")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_UUID_RANDOM_PROVIDER_FORCE_POSIX=1")
if ("${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86" OR "${CMAKE_BUILD_TARGET_ARCH}" STREQUAL "x86_64")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse4.2")
+ if (USE_AVX2)
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -mavx2")
+ endif()
endif()
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-attributes -DS2_USE_GFLAGS -DS2_USE_GLOG")
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 2acb84f..4d288f5 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -223,7 +223,7 @@ private:
// push down bloom filters to storage engine.
// 1. std::pair.first :: column name
// 2. std::pair.second :: shared_ptr of BloomFilterFuncBase
- std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>
+ std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
_bloom_filters_push_down;
// Pool for storing allocated scanner objects. We don't want to use the
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 4461170..84a4368 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -63,7 +63,8 @@ OlapScanner::~OlapScanner() {}
Status OlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
+ const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ bloom_filters) {
// Get olap table
TTabletId tablet_id = scan_range.tablet_id;
SchemaHash schema_hash = strtoul(scan_range.schema_hash.c_str(), nullptr, 10);
@@ -137,7 +138,8 @@ Status OlapScanner::open() {
// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters) {
+ const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
+ bloom_filters) {
RETURN_IF_ERROR(_init_return_columns());
_params.tablet = _tablet;
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 2c09578..ccfca3e 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -57,7 +57,7 @@ public:
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>>&
+ const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
Status open();
@@ -97,7 +97,7 @@ public:
private:
Status _init_params(const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
- const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
+ const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
Status _init_return_columns();
void _convert_row_to_tuple(Tuple* tuple);
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 0268bce..dcb397f 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -45,6 +45,8 @@ add_library(Exprs
in_predicate.cpp
new_in_predicate.cpp
bloomfilter_predicate.cpp
+ block_bloom_filter_avx_impl.cc
+ block_bloom_filter_impl.cc
runtime_filter.cpp
runtime_filter_rpc.cpp
is_null_predicate.cpp
diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp
new file mode 100644
index 0000000..e8876ad
--- /dev/null
+++ b/be/src/exprs/block_bloom_filter.hpp
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "common/status.h"
+#include "fmt/format.h"
+#include "gutil/macros.h"
+#include "util/hash_util.hpp"
+#include "util/slice.h"
+
+namespace doris {
+
+// https://github.com/apache/kudu/blob/master/src/kudu/util/block_bloom_filter.h
+// BlockBloomFilter is modified based on Impala's BlockBloomFilter.
+// For general Bloomfilter implementations, this implementation is
+// more friendly to CPU Cache, and it is easier to use SIMD instructions to
+// speed up the implementation.
+
+class BlockBloomFilter {
+public:
+ explicit BlockBloomFilter();
+ ~BlockBloomFilter();
+
+ Status init(int log_space_bytes, uint32_t hash_seed);
+ // Initialize the BlockBloomFilter from a populated "directory" structure.
+ // Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message.
+ Status init_from_directory(int log_space_bytes, const Slice& directory, bool always_false,
+ uint32_t hash_seed);
+
+ void close();
+
+ // Adds an element to the BloomFilter. The function used to generate 'hash' need not
+ // have good uniformity, but it should have low collision probability. For instance, if
+ // the set of values is 32-bit ints, the identity function is a valid hash function for
+ // this Bloom filter, since the collision probability (the probability that two
+ // non-equal values will have the same hash value) is 0.
+ void insert(uint32_t hash) noexcept;
+ // Same as above with convenience of hashing the key.
+ void insert(const Slice& key) noexcept {
+ insert(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed));
+ }
+
+ // Finds an element in the BloomFilter, returning true if it is found and false (with
+ // high probability) if it is not.
+ bool find(uint32_t hash) const noexcept;
+ // Same as above with convenience of hashing the key.
+ bool find(const Slice& key) const noexcept {
+ return find(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed));
+ }
+
+ // Computes the logical OR of this filter with 'other' and stores the result in this
+ // filter.
+ // Notes:
+ // - The directory sizes of the Bloom filters must match.
+ // - Or'ing with kAlwaysTrueFilter is disallowed.
+ Status merge(const BlockBloomFilter& other);
+
+ // Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' bytes where 'n'
+ // is multiple of 32-bytes.
+ static Status or_equal_array(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out);
+
+ // Returns whether the Bloom filter is empty and hence would return false for all lookups.
+ bool always_false() const { return _always_false; }
+
+ // Returns amount of space used in log2 bytes.
+ int log_space_bytes() const { return _log_num_buckets + kLogBucketByteSize; }
+
+ // Returns the directory structure. Useful for serializing the BlockBloomFilter to
+ // a custom protobuf message.
+ Slice directory() const {
+ return Slice(reinterpret_cast<const uint8_t*>(_directory), directory_size());
+ }
+
+ // Representation of a filter which allows all elements to pass.
+ static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr;
+
+private:
+ // _always_false is true when the bloom filter hasn't had any elements inserted.
+ bool _always_false;
+
+ // The BloomFilter is divided up into Buckets and each Bucket comprises of 8 BucketWords of
+ // 4 bytes each.
+ static constexpr uint64_t kBucketWords = 8;
+ typedef uint32_t BucketWord;
+
+ // log2(number of bits in a BucketWord)
+ static constexpr int kLogBucketWordBits = 5;
+ static constexpr BucketWord kBucketWordMask = (1 << kLogBucketWordBits) - 1;
+
+ // log2(number of bytes in a bucket)
+ static constexpr int kLogBucketByteSize = 5;
+ // Bucket size in bytes.
+ static constexpr size_t kBucketByteSize = 1UL << kLogBucketByteSize;
+
+ static_assert(
+ (1 << kLogBucketWordBits) == std::numeric_limits<BucketWord>::digits,
+ "BucketWord must have a bit-width that is be a power of 2, like 64 for uint64_t.");
+
+ typedef BucketWord Bucket[kBucketWords];
+
+ // log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
+ int _log_num_buckets;
+
+ // _directory_mask is (1 << log_num_buckets_) - 1. It is precomputed for
+ // efficiency reasons.
+ uint32_t _directory_mask;
+
+ Bucket* _directory;
+
+ // Seed used with hash algorithm.
+ uint32_t _hash_seed;
+
+ // Helper function for public Init() variants.
+ Status init_internal(int log_space_bytes, uint32_t hash_seed);
+
+ // Same as Insert(), but skips the CPU check and assumes that AVX2 is not available.
+ void insert_no_avx2(uint32_t hash) noexcept;
+
+ // Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
+ // into and 'hash' is the value passed to Insert().
+ void bucket_insert(uint32_t bucket_idx, uint32_t hash) noexcept;
+
+ bool bucket_find(uint32_t bucket_idx, uint32_t hash) const noexcept;
+
+ // Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' without using AVX2
+ // operations.
+ static void or_equal_array_no_avx2(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out);
+ // Helper function for OrEqualArray functions that encapsulates AVX2 v/s non-AVX2 logic to
+ // invoke the right function.
+ static void or_equal_array_internal(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out);
+
+#ifdef __AVX2__
+ // Same as Insert(), but skips the CPU check and assumes that AVX2 is available.
+ void insert_avx2(uint32_t hash) noexcept __attribute__((__target__("avx2")));
+
+ // A faster SIMD version of BucketInsert().
+ void bucket_insert_avx2(uint32_t bucket_idx, uint32_t hash) noexcept
+ __attribute__((__target__("avx2")));
+
+ // A faster SIMD version of BucketFind().
+ bool bucket_find_avx2(uint32_t bucket_idx, uint32_t hash) const noexcept
+ __attribute__((__target__("avx2")));
+
+ // Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' using AVX2
+ // instructions. 'n' must be a multiple of 32.
+ static void or_equal_array_avx2(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out) __attribute__((target("avx2")));
+
+#endif
+ // Size of the internal directory structure in bytes.
+ int64_t directory_size() const { return 1ULL << log_space_bytes(); }
+
+ // Some constants used in hashing. #defined for efficiency reasons.
+#define BLOOM_HASH_CONSTANTS \
+ 0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU, 0x705495c7U, 0x2df1424bU, 0x9efc4947U, \
+ 0x5c6bfb31U
+
+ // kRehash is used as 8 odd 32-bit unsigned ints. See Dietzfelbinger et al.'s "A
+ // reliable randomized algorithm for the closest-pair problem".
+ static constexpr uint32_t kRehash[8] __attribute__((aligned(32))) = {BLOOM_HASH_CONSTANTS};
+
+ // Get 32 more bits of randomness from a 32-bit hash:
+ static inline uint32_t rehash32to32(const uint32_t hash) {
+ // Constants generated by uuidgen(1) with the -r flag
+ static constexpr uint64_t m = 0x7850f11ec6d14889ULL;
+ static constexpr uint64_t a = 0x6773610597ca4c63ULL;
+ // This is strongly universal hashing following Dietzfelbinger's "Universal hashing
+ // and k-wise independent random variables via integer arithmetic without primes". As
+ // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the
+ // randomness of the constants) that any subset of bit positions of
+ // Rehash32to32(hash1) is equal to the same subset of bit positions
+ // Rehash32to32(hash2) is minimal.
+ return (static_cast<uint64_t>(hash) * m + a) >> 32U;
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter);
+
+ std::unique_ptr<char[]> _mem_holder;
+};
+
+} // namespace doris
diff --git a/be/src/exprs/block_bloom_filter_avx_impl.cc b/be/src/exprs/block_bloom_filter_avx_impl.cc
new file mode 100644
index 0000000..ca8ab95
--- /dev/null
+++ b/be/src/exprs/block_bloom_filter_avx_impl.cc
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifdef __AVX2__
+
+#include <immintrin.h>
+
+#include "exprs/block_bloom_filter.hpp"
+
+namespace doris {
+static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i make_mark(
+ const uint32_t hash) {
+ const __m256i ones = _mm256_set1_epi32(1);
+ const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
+ // Load hash into a YMM register, repeated eight times
+ __m256i hash_data = _mm256_set1_epi32(hash);
+ // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
+ // odd constants, then keep the 5 most significant bits from each product.
+ hash_data = _mm256_mullo_epi32(rehash, hash_data);
+ hash_data = _mm256_srli_epi32(hash_data, 27);
+ // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+ return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void BlockBloomFilter::bucket_insert_avx2(const uint32_t bucket_idx, const uint32_t hash) noexcept {
+ const __m256i mask = make_mark(hash);
+ __m256i* const bucket = &(reinterpret_cast<__m256i*>(_directory)[bucket_idx]);
+ _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+ // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
+ // dont have to save them off before using XMM registers.
+ _mm256_zeroupper();
+}
+
+bool BlockBloomFilter::bucket_find_avx2(const uint32_t bucket_idx, const uint32_t hash) const
+ noexcept {
+ const __m256i mask = make_mark(hash);
+ const __m256i bucket = reinterpret_cast<__m256i*>(_directory)[bucket_idx];
+ // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
+ // takes the negation of its first argument and ands that with its second argument. In
+ // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
+ // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
+ const bool result = _mm256_testc_si256(bucket, mask);
+ _mm256_zeroupper();
+ return result;
+}
+
+void BlockBloomFilter::insert_avx2(const uint32_t hash) noexcept {
+ _always_false = false;
+ const uint32_t bucket_idx = rehash32to32(hash) & _directory_mask;
+ bucket_insert_avx2(bucket_idx, hash);
+}
+
+void BlockBloomFilter::or_equal_array_avx2(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out) {
+ static constexpr size_t kAVXRegisterBytes = sizeof(__m256d);
+ static_assert(kAVXRegisterBytes == kBucketByteSize, "Unexpected AVX register bytes");
+ DCHECK_EQ(n % kAVXRegisterBytes, 0) << "Invalid Bloom filter directory size";
+
+ const uint8_t* const in_end = in + n;
+ for (; in != in_end; (in += kAVXRegisterBytes), (out += kAVXRegisterBytes)) {
+ const double* double_in = reinterpret_cast<const double*>(in);
+ double* double_out = reinterpret_cast<double*>(out);
+ _mm256_storeu_pd(double_out,
+ _mm256_or_pd(_mm256_loadu_pd(double_out), _mm256_loadu_pd(double_in)));
+ }
+}
+} // namespace doris
+#endif
\ No newline at end of file
diff --git a/be/src/exprs/block_bloom_filter_impl.cc b/be/src/exprs/block_bloom_filter_impl.cc
new file mode 100644
index 0000000..f38f68b
--- /dev/null
+++ b/be/src/exprs/block_bloom_filter_impl.cc
@@ -0,0 +1,233 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <emmintrin.h>
+#include <mm_malloc.h>
+
+#include <algorithm>
+#include <climits>
+#include <cmath>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+
+#include "exprs/block_bloom_filter.hpp"
+
+namespace doris {
+
+constexpr uint32_t BlockBloomFilter::kRehash[8] __attribute__((aligned(32)));
+// constexpr data member requires initialization in the class declaration.
+// Hence no duplicate initialization in the definition here.
+constexpr BlockBloomFilter* const BlockBloomFilter::kAlwaysTrueFilter;
+
+BlockBloomFilter::BlockBloomFilter()
+ : _always_false(true),
+ _log_num_buckets(0),
+ _directory_mask(0),
+ _directory(nullptr),
+ _hash_seed(0) {}
+
+BlockBloomFilter::~BlockBloomFilter() {
+ close();
+}
+
+Status BlockBloomFilter::init_internal(const int log_space_bytes, uint32_t hash_seed) {
+ // Since log_space_bytes is in bytes, we need to convert it to the number of tiny
+ // Bloom filters we will use.
+ _log_num_buckets = std::max(1, log_space_bytes - kLogBucketByteSize);
+ // Since we use 32 bits in the arguments of Insert() and Find(), _log_num_buckets
+ // must be limited.
+ if (_log_num_buckets > 32) {
+ return Status::InvalidArgument(
+ fmt::format("Bloom filter too large. log_space_bytes: {}", log_space_bytes));
+ }
+ // Don't use _log_num_buckets if it will lead to undefined behavior by a shift
+ // that is too large.
+ _directory_mask = (1ULL << _log_num_buckets) - 1;
+
+ const size_t alloc_size = directory_size();
+ close(); // Ensure that any previously allocated memory for directory_ is released.
+ _mem_holder.reset(new char[alloc_size]);
+ _directory = (Bucket*)_mem_holder.get();
+ _hash_seed = hash_seed;
+ return Status::OK();
+}
+
+Status BlockBloomFilter::init(const int log_space_bytes, uint32_t hash_seed) {
+ RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed));
+ DCHECK(_directory);
+ memset(_directory, 0, directory_size());
+ _always_false = true;
+ return Status::OK();
+}
+
+Status BlockBloomFilter::init_from_directory(int log_space_bytes, const Slice& directory,
+ bool always_false, uint32_t hash_seed) {
+ RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed));
+ DCHECK(_directory);
+
+ if (directory_size() != directory.size) {
+ return Status::InvalidArgument(fmt::format(
+ "Mismatch in BlockBloomFilter source directory size {} and expected size {}",
+ directory.size, directory_size()));
+ }
+ memcpy(_directory, directory.data, directory.size);
+ _always_false = always_false;
+ return Status::OK();
+}
+
+void BlockBloomFilter::close() {
+ if (_directory != nullptr) {
+ _directory = nullptr;
+ }
+}
+
+void BlockBloomFilter::bucket_insert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
+ // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
+ // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
+ // part of this method.
+ uint32_t new_bucket[kBucketWords] __attribute__((aligned(16)));
+ for (int i = 0; i < kBucketWords; ++i) {
+ // Rehash 'hash' and use the top kLogBucketWordBits bits, following Dietzfelbinger.
+ new_bucket[i] = (kRehash[i] * hash) >> ((1 << kLogBucketWordBits) - kLogBucketWordBits);
+ new_bucket[i] = 1U << new_bucket[i];
+ }
+ for (int i = 0; i < 2; ++i) {
+ __m128i new_bucket_sse = _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
+ __m128i* existing_bucket =
+ reinterpret_cast<__m128i*>(&DCHECK_NOTNULL(_directory)[bucket_idx][4 * i]);
+ *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
+ }
+}
+
+bool BlockBloomFilter::bucket_find(const uint32_t bucket_idx, const uint32_t hash) const noexcept {
+ for (int i = 0; i < kBucketWords; ++i) {
+ BucketWord hval = (kRehash[i] * hash) >> ((1 << kLogBucketWordBits) - kLogBucketWordBits);
+ hval = 1U << hval;
+ if (!(DCHECK_NOTNULL(_directory)[bucket_idx][i] & hval)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void BlockBloomFilter::insert_no_avx2(const uint32_t hash) noexcept {
+ _always_false = false;
+ const uint32_t bucket_idx = rehash32to32(hash) & _directory_mask;
+ bucket_insert(bucket_idx, hash);
+}
+
+// To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
+// is a "split Bloom filter", and it has approximately the same false positive probability
+// as standard a Bloom filter; See Mitzenmacher's "Bloom Filters and Such". It also has
+// the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 random bits for
+// a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter.
+void BlockBloomFilter::insert(const uint32_t hash) noexcept {
+ _always_false = false;
+ const uint32_t bucket_idx = rehash32to32(hash) & _directory_mask;
+#ifdef __AVX2__
+ bucket_insert_avx2(bucket_idx, hash);
+#else
+ bucket_insert(bucket_idx, hash);
+#endif
+}
+
+bool BlockBloomFilter::find(const uint32_t hash) const noexcept {
+ if (_always_false) {
+ return false;
+ }
+ const uint32_t bucket_idx = rehash32to32(hash) & _directory_mask;
+#ifdef __AVX2__
+ return bucket_find_avx2(bucket_idx, hash);
+#else
+ return bucket_find(bucket_idx, hash);
+#endif
+}
+
+void BlockBloomFilter::or_equal_array_internal(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out) {
+#ifdef __AVX2__
+ BlockBloomFilter::or_equal_array_avx2(n, in, out);
+#else
+ BlockBloomFilter::or_equal_array(n, in, out);
+#endif
+}
+
+Status BlockBloomFilter::or_equal_array(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out) {
+ if ((n % kBucketByteSize) != 0) {
+ return Status::InvalidArgument(fmt::format("Input size {} not a multiple of 32-bytes", n));
+ }
+
+ or_equal_array_internal(n, in, out);
+
+ return Status::OK();
+}
+
+void BlockBloomFilter::or_equal_array_no_avx2(size_t n, const uint8_t* __restrict__ in,
+ uint8_t* __restrict__ out) {
+ // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
+ // written in a way that is very friendly to auto-vectorization. Instead, we manually
+ // vectorize, increasing the speed by up to 56x.
+ const __m128i* simd_in = reinterpret_cast<const __m128i*>(in);
+ const __m128i* const simd_in_end = reinterpret_cast<const __m128i*>(in + n);
+ __m128i* simd_out = reinterpret_cast<__m128i*>(out);
+ // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+ // == 16, we can do two _mm_or_si128's in each iteration without checking array
+ // bounds.
+ while (simd_in != simd_in_end) {
+ for (int i = 0; i < 2; ++i, ++simd_in, ++simd_out) {
+ _mm_storeu_si128(simd_out,
+ _mm_or_si128(_mm_loadu_si128(simd_out), _mm_loadu_si128(simd_in)));
+ }
+ }
+}
+
+Status BlockBloomFilter::merge(const BlockBloomFilter& other) {
+ // AlwaysTrueFilter is a special case implemented with a nullptr.
+ // Hence merge'ing with an AlwaysTrueFilter will result in a Bloom filter that also
+ // always returns true which'll require destructing this Bloom filter.
+ // Moreover for a reference "other" to be an AlwaysTrueFilter the reference needs
+ // to be created from a nullptr and so we get into undefined behavior territory.
+ // Comparing AlwaysTrueFilter with "&other" results in a compiler warning for
+ // comparing a non-null argument "other" with NULL [-Wnonnull-compare].
+ // For above reasons, guard against it.
+ CHECK_NE(kAlwaysTrueFilter, &other);
+
+ if (this == &other) {
+ // No op.
+ return Status::OK();
+ }
+ if (directory_size() != other.directory_size()) {
+ return Status::InvalidArgument(
+ fmt::format("Directory size don't match. this: {}, other: {}", directory_size(),
+ other.directory_size()));
+ }
+ if (other.always_false()) {
+ // Nothing to do.
+ return Status::OK();
+ }
+
+ or_equal_array_internal(directory_size(), reinterpret_cast<const uint8*>(other._directory),
+ reinterpret_cast<uint8*>(_directory));
+
+ _always_false = false;
+ return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp
index 9f98c1d..cd20046 100644
--- a/be/src/exprs/bloomfilter_predicate.cpp
+++ b/be/src/exprs/bloomfilter_predicate.cpp
@@ -20,53 +20,42 @@
#include <sstream>
#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.hpp"
namespace doris {
-BloomFilterFuncBase* BloomFilterFuncBase::create_bloom_filter(MemTracker* tracker,
- PrimitiveType type) {
+IBloomFilterFuncBase* IBloomFilterFuncBase::create_bloom_filter(MemTracker* tracker,
+ PrimitiveType type) {
switch (type) {
case TYPE_BOOLEAN:
- return new (std::nothrow) BloomFilterFunc<bool>(tracker);
-
+ return new BloomFilterFunc<TYPE_BOOLEAN, CurrentBloomFilterAdaptor>(tracker);
case TYPE_TINYINT:
- return new (std::nothrow) BloomFilterFunc<int8_t>(tracker);
-
+ return new BloomFilterFunc<TYPE_TINYINT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_SMALLINT:
- return new (std::nothrow) BloomFilterFunc<int16_t>(tracker);
-
+ return new BloomFilterFunc<TYPE_SMALLINT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_INT:
- return new (std::nothrow) BloomFilterFunc<int32_t>(tracker);
-
+ return new BloomFilterFunc<TYPE_INT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_BIGINT:
- return new (std::nothrow) BloomFilterFunc<int64_t>(tracker);
-
+ return new BloomFilterFunc<TYPE_BIGINT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_FLOAT:
- return new (std::nothrow) BloomFilterFunc<float>(tracker);
-
+ return new BloomFilterFunc<TYPE_FLOAT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_DOUBLE:
- return new (std::nothrow) BloomFilterFunc<double>(tracker);
-
+ return new BloomFilterFunc<TYPE_DOUBLE, CurrentBloomFilterAdaptor>(tracker);
case TYPE_DATE:
- return new (std::nothrow) DateBloomFilterFunc(tracker);
-
+ return new BloomFilterFunc<TYPE_DATE, CurrentBloomFilterAdaptor>(tracker);
case TYPE_DATETIME:
- return new (std::nothrow) DateTimeBloomFilterFunc(tracker);
-
+ return new BloomFilterFunc<TYPE_DATETIME, CurrentBloomFilterAdaptor>(tracker);
case TYPE_DECIMALV2:
- return new (std::nothrow) DecimalV2FilterFunc(tracker);
-
+ return new BloomFilterFunc<TYPE_DECIMALV2, CurrentBloomFilterAdaptor>(tracker);
case TYPE_LARGEINT:
- return new (std::nothrow) BloomFilterFunc<__int128>(tracker);
-
+ return new BloomFilterFunc<TYPE_LARGEINT, CurrentBloomFilterAdaptor>(tracker);
case TYPE_CHAR:
- return new (std::nothrow) FixedCharBloomFilterFunc(tracker);
+ return new BloomFilterFunc<TYPE_CHAR, CurrentBloomFilterAdaptor>(tracker);
case TYPE_VARCHAR:
- return new (std::nothrow) BloomFilterFunc<StringValue>(tracker);
-
+ return new BloomFilterFunc<TYPE_VARCHAR, CurrentBloomFilterAdaptor>(tracker);
default:
return nullptr;
}
@@ -74,12 +63,6 @@ BloomFilterFuncBase* BloomFilterFuncBase::create_bloom_filter(MemTracker* tracke
return nullptr;
}
-Status BloomFilterFuncBase::get_data(char** data, int* len) {
- *data = _bloom_filter->data();
- *len = _bloom_filter->size();
- return Status::OK();
-}
-
BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node)
: Predicate(node),
_is_prepare(false),
@@ -99,7 +82,7 @@ BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other)
_filtered_rows(),
_scan_rows() {}
-Status BloomFilterPredicate::prepare(RuntimeState* state, BloomFilterFuncBase* filter) {
+Status BloomFilterPredicate::prepare(RuntimeState* state, IBloomFilterFuncBase* filter) {
// DCHECK(filter != nullptr);
if (_is_prepare) {
return Status::OK();
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 21c7176..31f315b 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -18,21 +18,84 @@
#ifndef DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
#define DORIS_BE_SRC_QUERY_EXPRS_BLOOM_PREDICATE_H
#include <algorithm>
+#include <cmath>
#include <memory>
#include <string>
#include "common/object_pool.h"
-#include "exprs/expr_context.h"
+#include "exprs/block_bloom_filter.hpp"
#include "exprs/predicate.h"
+#include "olap/bloom_filter.hpp"
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
namespace doris {
-/// only used in Runtime Filter
-class BloomFilterFuncBase {
+namespace detail {
+class BlockBloomFilterAdaptor {
public:
- BloomFilterFuncBase(MemTracker* tracker) : _tracker(tracker), _inited(false) {};
+ BlockBloomFilterAdaptor() { _bloom_filter = std::make_shared<doris::BlockBloomFilter>(); }
+ static int64_t optimal_bit_num(int64_t expect_num, double fpp) {
+ return doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8;
+ }
+
+ static BlockBloomFilterAdaptor* create() { return new BlockBloomFilterAdaptor(); }
+
+ Status merge(BlockBloomFilterAdaptor* other) {
+ return _bloom_filter->merge(*other->_bloom_filter);
+ }
+
+ Status init(int len) {
+ int log_space = log2(len);
+ return _bloom_filter->init(log_space, /*hash_seed*/ 0);
+ }
+
+ Status init(const char* data, int len) {
+ int log_space = log2(len);
+ return _bloom_filter->init_from_directory(log_space, Slice(data, len), false, 0);
+ }
+
+ char* data() { return (char*)_bloom_filter->directory().data; }
+
+ size_t size() { return _bloom_filter->directory().size; }
+
+ bool test_bytes(const char* data, size_t len) const {
+ return _bloom_filter->find(Slice(data, len));
+ }
+
+ void add_bytes(const char* data, size_t len) { _bloom_filter->insert(Slice(data, len)); }
+
+private:
+ std::shared_ptr<doris::BlockBloomFilter> _bloom_filter;
+};
+
+} // namespace detail
+using CurrentBloomFilterAdaptor = detail::BlockBloomFilterAdaptor;
+// Only Used In RuntimeFilter
+class IBloomFilterFuncBase {
+public:
+ virtual ~IBloomFilterFuncBase() {}
+ virtual Status init(int64_t expect_num, double fpp) = 0;
+ virtual Status init_with_fixed_length(int64_t bloom_filter_length) = 0;
+
+ virtual void insert(const void* data) = 0;
+ virtual bool find(const void* data) const = 0;
+ virtual bool find_olap_engine(const void* data) const = 0;
+
+ virtual Status merge(IBloomFilterFuncBase* bloomfilter_func) = 0;
+ virtual Status assign(const char* data, int len) = 0;
+
+ virtual Status get_data(char** data, int* len) = 0;
+ virtual MemTracker* tracker() = 0;
+ virtual void light_copy(IBloomFilterFuncBase* other) = 0;
+
+ static IBloomFilterFuncBase* create_bloom_filter(MemTracker* tracker, PrimitiveType type);
+};
+
+template <class BloomFilterAdaptor>
+class BloomFilterFuncBase : public IBloomFilterFuncBase {
+public:
+ BloomFilterFuncBase(MemTracker* tracker) : _tracker(tracker), _inited(false) {}
virtual ~BloomFilterFuncBase() {
if (_tracker != nullptr) {
@@ -40,174 +103,122 @@ public:
}
}
- // init a bloom filter with expect element num
- virtual Status init(int64_t expect_num = 4096, double fpp = 0.05) {
- DCHECK(!_inited);
- DCHECK(expect_num >= 0); // we need alloc 'optimal_bit_num(expect_num,fpp) / 8' bytes
- _bloom_filter_alloced =
- doris::segment_v2::BloomFilter::optimal_bit_num(expect_num, fpp) / 8;
-
- std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
- Status st = doris::segment_v2::BloomFilter::create(
- doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
- // status is always true if we use valid BloomFilterAlgorithmPB
- DCHECK(st.ok());
- RETURN_IF_ERROR(st);
- st = bloom_filter->init(_bloom_filter_alloced,
- doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
- // status is always true if we use HASH_MURMUR3_X64_64
- DCHECK(st.ok());
- _bloom_filter.reset(bloom_filter.release());
- _tracker->Consume(_bloom_filter_alloced);
- _inited = true;
- return st;
+ Status init(int64_t expect_num, double fpp) override {
+ size_t filter_size = BloomFilterAdaptor::optimal_bit_num(expect_num, fpp);
+ return init_with_fixed_length(filter_size);
}
- virtual Status init_with_fixed_length(int64_t bloom_filter_length) {
+ Status init_with_fixed_length(int64_t bloom_filter_length) override {
DCHECK(!_inited);
DCHECK(bloom_filter_length >= 0);
-
- std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
+ DCHECK_EQ((bloom_filter_length & (bloom_filter_length - 1)), 0);
_bloom_filter_alloced = bloom_filter_length;
- Status st = doris::segment_v2::BloomFilter::create(
- doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter);
- DCHECK(st.ok());
- st = bloom_filter->init(_bloom_filter_alloced,
- doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
- DCHECK(st.ok());
+ _bloom_filter.reset(BloomFilterAdaptor::create());
+ RETURN_IF_ERROR(_bloom_filter->init(bloom_filter_length));
_tracker->Consume(_bloom_filter_alloced);
- _bloom_filter.reset(bloom_filter.release());
_inited = true;
- return st;
+ return Status::OK();
}
- virtual void insert(const void* data) = 0;
-
- virtual bool find(const void* data) = 0;
-
- // Because the data structures of the execution layer and the storage layer are inconsistent,
- // we need to provide additional interfaces for the storage layer to call
- virtual bool find_olap_engine(const void* data) { return this->find(data); }
-
- Status merge(BloomFilterFuncBase* bloomfilter_func) {
- DCHECK(_inited);
- if (_bloom_filter == nullptr) {
- std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
- RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
- doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
- _bloom_filter.reset(bloom_filter.release());
+ Status merge(IBloomFilterFuncBase* bloomfilter_func) override {
+ auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ if (bloomfilter_func == nullptr) {
+ _bloom_filter.reset(BloomFilterAdaptor::create());
}
- if (_bloom_filter_alloced != bloomfilter_func->_bloom_filter_alloced) {
+ if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
LOG(WARNING) << "bloom filter size not the same";
return Status::InvalidArgument("bloom filter size invalid");
}
- return _bloom_filter->merge(bloomfilter_func->_bloom_filter.get());
+ return _bloom_filter->merge(other_func->_bloom_filter.get());
}
- Status assign(const char* data, int len) {
+ Status assign(const char* data, int len) override {
if (_bloom_filter == nullptr) {
- std::unique_ptr<doris::segment_v2::BloomFilter> bloom_filter;
- RETURN_IF_ERROR(doris::segment_v2::BloomFilter::create(
- doris::segment_v2::BloomFilterAlgorithmPB::BLOCK_BLOOM_FILTER, &bloom_filter));
- _bloom_filter.reset(bloom_filter.release());
+ _bloom_filter.reset(BloomFilterAdaptor::create());
}
- _bloom_filter_alloced = len - 1;
+
+ _bloom_filter_alloced = len;
_tracker->Consume(_bloom_filter_alloced);
- return _bloom_filter->init(data, len,
- doris::segment_v2::HashStrategyPB::HASH_MURMUR3_X64_64);
+ return _bloom_filter->init(data, len);
}
- /// create a bloom filter function
- /// tracker shouldn't be nullptr
- static BloomFilterFuncBase* create_bloom_filter(MemTracker* tracker, PrimitiveType type);
- Status get_data(char** data, int* len);
+ Status get_data(char** data, int* len) override {
+ *data = _bloom_filter->data();
+ *len = _bloom_filter->size();
+ return Status::OK();
+ }
- MemTracker* tracker() { return _tracker; }
+ MemTracker* tracker() override { return _tracker; }
- void light_copy(BloomFilterFuncBase* other) {
+ void light_copy(IBloomFilterFuncBase* bloomfilter_func) override {
+ auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
_tracker = nullptr;
- _bloom_filter_alloced = other->_bloom_filter_alloced;
- _bloom_filter = other->_bloom_filter;
- _inited = other->_inited;
+ _bloom_filter_alloced = other_func->_bloom_filter_alloced;
+ _bloom_filter = other_func->_bloom_filter;
+ _inited = other_func->_inited;
}
protected:
MemTracker* _tracker;
// bloom filter size
int32_t _bloom_filter_alloced;
- std::shared_ptr<doris::segment_v2::BloomFilter> _bloom_filter;
+ std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
bool _inited;
};
-template <class T>
-class BloomFilterFunc : public BloomFilterFuncBase {
-public:
- BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
-
- ~BloomFilterFunc() = default;
-
- virtual void insert(const void* data) {
- DCHECK(_bloom_filter != nullptr);
- _bloom_filter->add_bytes((char*)data, sizeof(T));
+template <class T, class BloomFilterAdaptor>
+struct CommonFindOp {
+ ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
+ bloom_filter.add_bytes((char*)data, sizeof(T));
}
-
- virtual bool find(const void* data) {
- DCHECK(_bloom_filter != nullptr);
- return _bloom_filter->test_bytes((char*)data, sizeof(T));
+ ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+ return bloom_filter.test_bytes((char*)data, sizeof(T));
+ }
+ ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+ const void* data) const {
+ return this->find(bloom_filter, data);
}
};
-template <>
-class BloomFilterFunc<StringValue> : public BloomFilterFuncBase {
-public:
- BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase(tracker) {}
-
- ~BloomFilterFunc() = default;
-
- virtual void insert(const void* data) {
- DCHECK(_bloom_filter != nullptr);
+template <class BloomFilterAdaptor>
+struct StringFindOp {
+ ALWAYS_INLINE void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
const auto* value = reinterpret_cast<const StringValue*>(data);
- _bloom_filter->add_bytes(value->ptr, value->len);
+ bloom_filter.add_bytes(value->ptr, value->len);
}
-
- virtual bool find(const void* data) {
- DCHECK(_bloom_filter != nullptr);
+ ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
const auto* value = reinterpret_cast<const StringValue*>(data);
- return _bloom_filter->test_bytes(value->ptr, value->len);
+ return bloom_filter.test_bytes(value->ptr, value->len);
+ }
+ ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+ const void* data) const {
+ return StringFindOp::find(bloom_filter, data);
}
};
-class FixedCharBloomFilterFunc : public BloomFilterFunc<StringValue> {
-public:
- FixedCharBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<StringValue>(tracker) {}
-
- ~FixedCharBloomFilterFunc() = default;
-
- virtual bool find(const void* data) {
- DCHECK(_bloom_filter != nullptr);
+template <class BloomFilterAdaptor>
+struct FixedStringFindOp : public StringFindOp<BloomFilterAdaptor> {
+ ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+ const void* data) const {
const auto* value = reinterpret_cast<const StringValue*>(data);
auto end_ptr = value->ptr + value->len - 1;
while (end_ptr > value->ptr && *end_ptr == '\0') --end_ptr;
- return _bloom_filter->test_bytes(value->ptr, end_ptr - value->ptr + 1);
+ return bloom_filter.test_bytes(value->ptr, end_ptr - value->ptr + 1);
}
};
-class DateTimeBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
-public:
- DateTimeBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
-
- virtual bool find_olap_engine(const void* data) {
+template <class BloomFilterAdaptor>
+struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
DateTimeValue value;
value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
- return _bloom_filter->test_bytes((char*)&value, sizeof(DateTimeValue));
+ return bloom_filter.test_bytes((char*)&value, sizeof(DateTimeValue));
}
};
-class DateBloomFilterFunc : public BloomFilterFunc<DateTimeValue> {
-public:
- DateBloomFilterFunc(MemTracker* tracker) : BloomFilterFunc<DateTimeValue>(tracker) {}
-
- virtual bool find_olap_engine(const void* data) {
+template <class BloomFilterAdaptor>
+struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
uint64_t value = 0;
value = *(unsigned char*)((char*)data + 2);
value <<= 8;
@@ -217,23 +228,77 @@ public:
DateTimeValue date_value;
date_value.from_olap_date(value);
date_value.to_datetime();
- return _bloom_filter->test_bytes((char*)&date_value, sizeof(DateTimeValue));
+ return bloom_filter.test_bytes((char*)&date_value, sizeof(DateTimeValue));
}
};
-class DecimalV2FilterFunc : public BloomFilterFunc<DecimalV2Value> {
-public:
- DecimalV2FilterFunc(MemTracker* tracker) : BloomFilterFunc<DecimalV2Value>(tracker) {}
-
- virtual bool find_olap_engine(const void* data) {
+template <class BloomFilterAdaptor>
+struct DecimalV2FindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
+ bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
DecimalV2Value value;
int64_t int_value = *(int64_t*)(data);
int32_t frac_value = *(int32_t*)((char*)data + sizeof(int64_t));
value.from_olap_decimal(int_value, frac_value);
- return _bloom_filter->test_bytes((char*)&value, sizeof(DecimalV2Value));
+ return bloom_filter.test_bytes((char*)&value, sizeof(DecimalV2Value));
}
};
+template <PrimitiveType type, class BloomFilterAdaptor>
+struct BloomFilterTypeTraits {
+ using T = typename PrimitiveTypeTraits<type>::CppType;
+ using FindOp = CommonFindOp<T, BloomFilterAdaptor>;
+};
+
+template <class BloomFilterAdaptor>
+struct BloomFilterTypeTraits<TYPE_DATE, BloomFilterAdaptor> {
+ using FindOp = DateFindOp<BloomFilterAdaptor>;
+};
+
+template <class BloomFilterAdaptor>
+struct BloomFilterTypeTraits<TYPE_DATETIME, BloomFilterAdaptor> {
+ using FindOp = DateTimeFindOp<BloomFilterAdaptor>;
+};
+
+template <class BloomFilterAdaptor>
+struct BloomFilterTypeTraits<TYPE_DECIMALV2, BloomFilterAdaptor> {
+ using FindOp = DecimalV2FindOp<BloomFilterAdaptor>;
+};
+
+template <class BloomFilterAdaptor>
+struct BloomFilterTypeTraits<TYPE_CHAR, BloomFilterAdaptor> {
+ using FindOp = FixedStringFindOp<BloomFilterAdaptor>;
+};
+
+template <class BloomFilterAdaptor>
+struct BloomFilterTypeTraits<TYPE_VARCHAR, BloomFilterAdaptor> {
+ using FindOp = StringFindOp<BloomFilterAdaptor>;
+};
+
+template <PrimitiveType type, class BloomFilterAdaptor>
+class BloomFilterFunc final : public BloomFilterFuncBase<BloomFilterAdaptor> {
+public:
+ BloomFilterFunc(MemTracker* tracker) : BloomFilterFuncBase<BloomFilterAdaptor>(tracker) {}
+
+ ~BloomFilterFunc() = default;
+
+ void insert(const void* data) {
+ DCHECK(this->_bloom_filter != nullptr);
+ dummy.insert(*this->_bloom_filter, data);
+ }
+
+ bool find(const void* data) const override {
+ DCHECK(this->_bloom_filter != nullptr);
+ return dummy.find(*this->_bloom_filter, data);
+ }
+
+ bool find_olap_engine(const void* data) const override {
+ return dummy.find_olap_engine(*this->_bloom_filter, data);
+ }
+
+private:
+ typename BloomFilterTypeTraits<type, BloomFilterAdaptor>::FindOp dummy;
+};
+
// BloomFilterPredicate only used in runtime filter
class BloomFilterPredicate : public Predicate {
public:
@@ -243,9 +308,9 @@ public:
virtual Expr* clone(ObjectPool* pool) const override {
return pool->add(new BloomFilterPredicate(*this));
}
- Status prepare(RuntimeState* state, BloomFilterFuncBase* bloomfilterfunc);
+ Status prepare(RuntimeState* state, IBloomFilterFuncBase* bloomfilterfunc);
- std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
+ std::shared_ptr<IBloomFilterFuncBase> get_bloom_filter_func() { return _filter; }
virtual BooleanVal get_boolean_val(ExprContext* context, TupleRow* row) override;
@@ -264,7 +329,7 @@ private:
std::atomic<int64_t> _filtered_rows;
std::atomic<int64_t> _scan_rows;
- std::shared_ptr<BloomFilterFuncBase> _filter;
+ std::shared_ptr<IBloomFilterFuncBase> _filter;
bool _has_calculate_filter = false;
// loop size must be power of 2
constexpr static int64_t _loop_size = 8192;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c067798..852601f 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -461,7 +461,7 @@ public:
}
case RuntimeFilterType::BLOOM_FILTER: {
_bloomfilter_func.reset(
- BloomFilterFuncBase::create_bloom_filter(_tracker, _column_return_type));
+ IBloomFilterFuncBase::create_bloom_filter(_tracker, _column_return_type));
return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size);
}
default:
@@ -588,7 +588,7 @@ public:
// we won't use this class to insert or find any data
// so any type is ok
_bloomfilter_func.reset(
- BloomFilterFuncBase::create_bloom_filter(_tracker, PrimitiveType::TYPE_INT));
+ IBloomFilterFuncBase::create_bloom_filter(_tracker, PrimitiveType::TYPE_INT));
return _bloomfilter_func->assign(data, bloom_filter->filter_length());
}
@@ -727,7 +727,7 @@ private:
RuntimeFilterType _filter_type;
std::unique_ptr<MinMaxFuncBase> _minmax_func;
std::unique_ptr<HybridSetBase> _hybrid_set;
- std::unique_ptr<BloomFilterFuncBase> _bloomfilter_func;
+ std::unique_ptr<IBloomFilterFuncBase> _bloomfilter_func;
};
Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool,
diff --git a/be/src/olap/bloom_filter_predicate.cpp b/be/src/olap/bloom_filter_predicate.cpp
index 06e4947..1a86b5d 100644
--- a/be/src/olap/bloom_filter_predicate.cpp
+++ b/be/src/olap/bloom_filter_predicate.cpp
@@ -17,51 +17,47 @@
#include "olap/bloom_filter_predicate.h"
-#include "olap/field.h"
-#include "runtime/string_value.hpp"
-#include "runtime/vectorized_row_batch.h"
+#define APPLY_FOR_PRIMTYPE(M) \
+ M(TYPE_TINYINT) \
+ M(TYPE_SMALLINT) \
+ M(TYPE_INT) \
+ M(TYPE_BIGINT) \
+ M(TYPE_LARGEINT) \
+ M(TYPE_FLOAT) \
+ M(TYPE_DOUBLE) \
+ M(TYPE_CHAR) \
+ M(TYPE_DATE) \
+ M(TYPE_DATETIME) \
+ M(TYPE_VARCHAR)
namespace doris {
-
-BloomFilterColumnPredicate::BloomFilterColumnPredicate(
- uint32_t column_id, const std::shared_ptr<BloomFilterFuncBase>& filter)
- : ColumnPredicate(column_id), _filter(filter) {}
-
-// blomm filter column predicate do not support in segment v1
-void BloomFilterColumnPredicate::evaluate(VectorizedRowBatch* batch) const {
- uint16_t n = batch->size();
- uint16_t* sel = batch->selected();
- if (!batch->selected_in_use()) {
- for (uint16_t i = 0; i != n; ++i) {
- sel[i] = i;
- }
+ColumnPredicate* BloomFilterColumnPredicateFactory::create_column_predicate(
+ uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& bloom_filter,
+ FieldType type) {
+ std::shared_ptr<IBloomFilterFuncBase> filter;
+ switch (type) {
+#define M(NAME) \
+ case OLAP_FIELD_##NAME: { \
+ filter.reset(IBloomFilterFuncBase::create_bloom_filter(bloom_filter->tracker(), NAME)); \
+ filter->light_copy(bloom_filter.get()); \
+ return new BloomFilterColumnPredicate<NAME>(column_id, filter); \
}
-}
-
-void BloomFilterColumnPredicate::evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const {
- uint16_t new_size = 0;
- if (block->is_nullable()) {
- for (uint16_t i = 0; i < *size; ++i) {
- uint16_t idx = sel[i];
- sel[new_size] = idx;
- const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
- new_size += (!block->cell(idx).is_null() && _filter->find_olap_engine(cell_value));
- }
- } else {
- for (uint16_t i = 0; i < *size; ++i) {
- uint16_t idx = sel[i];
- sel[new_size] = idx;
- const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
- new_size += _filter->find_olap_engine(cell_value);
- }
+ APPLY_FOR_PRIMTYPE(M)
+#undef M
+ case OLAP_FIELD_TYPE_DECIMAL: {
+ filter.reset(
+ IBloomFilterFuncBase::create_bloom_filter(bloom_filter->tracker(), TYPE_DECIMALV2));
+ filter->light_copy(bloom_filter.get());
+ return new BloomFilterColumnPredicate<TYPE_DECIMALV2>(column_id, filter);
+ }
+ case OLAP_FIELD_TYPE_BOOL: {
+ filter.reset(
+ IBloomFilterFuncBase::create_bloom_filter(bloom_filter->tracker(), TYPE_BOOLEAN));
+ filter->light_copy(bloom_filter.get());
+ return new BloomFilterColumnPredicate<TYPE_BOOLEAN>(column_id, filter);
+ }
+ default:
+ return nullptr;
}
- *size = new_size;
-}
-
-Status BloomFilterColumnPredicate::evaluate(const Schema& schema,
- const std::vector<BitmapIndexIterator*>& iterators,
- uint32_t num_rows, Roaring* result) const {
- return Status::OK();
}
-
} //namespace doris
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index 191bfae..d14956a 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -24,34 +24,86 @@
#include "exprs/bloomfilter_predicate.h"
#include "olap/column_predicate.h"
+#include "olap/field.h"
+#include "runtime/string_value.hpp"
+#include "runtime/vectorized_row_batch.h"
namespace doris {
class VectorizedRowBatch;
// only use in runtime filter and segment v2
+template <PrimitiveType type>
class BloomFilterColumnPredicate : public ColumnPredicate {
public:
+ using SpecificFilter = BloomFilterFunc<type, CurrentBloomFilterAdaptor>;
+
BloomFilterColumnPredicate(uint32_t column_id,
- const std::shared_ptr<BloomFilterFuncBase>& filter);
+ const std::shared_ptr<IBloomFilterFuncBase>& filter)
+ : ColumnPredicate(column_id),
+ _filter(filter),
+ _specific_filter(static_cast<SpecificFilter*>(_filter.get())) {}
~BloomFilterColumnPredicate() override = default;
void evaluate(VectorizedRowBatch* batch) const override;
void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const override;
- // Now BloomFilter not be a sub column predicate, so we not support OR and AND.
- // It should be supported in the future.
void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,
bool* flags) const override {};
void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,
bool* flags) const override {};
Status evaluate(const Schema& schema, const vector<BitmapIndexIterator*>& iterators,
- uint32_t num_rows, Roaring* roaring) const override;
+ uint32_t num_rows, Roaring* roaring) const override {
+ return Status::OK();
+ }
private:
- std::shared_ptr<BloomFilterFuncBase> _filter;
+ std::shared_ptr<IBloomFilterFuncBase> _filter;
+ SpecificFilter* _specific_filter; // owned by _filter
+};
+
+// blomm filter column predicate do not support in segment v1
+template <PrimitiveType type>
+void BloomFilterColumnPredicate<type>::evaluate(VectorizedRowBatch* batch) const {
+ uint16_t n = batch->size();
+ uint16_t* sel = batch->selected();
+ if (!batch->selected_in_use()) {
+ for (uint16_t i = 0; i != n; ++i) {
+ sel[i] = i;
+ }
+ }
+}
+
+template <PrimitiveType type>
+void BloomFilterColumnPredicate<type>::evaluate(ColumnBlock* block, uint16_t* sel,
+ uint16_t* size) const {
+ uint16_t new_size = 0;
+ if (block->is_nullable()) {
+ for (uint16_t i = 0; i < *size; ++i) {
+ uint16_t idx = sel[i];
+ sel[new_size] = idx;
+ const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+ new_size +=
+ (!block->cell(idx).is_null() && _specific_filter->find_olap_engine(cell_value));
+ }
+ } else {
+ for (uint16_t i = 0; i < *size; ++i) {
+ uint16_t idx = sel[i];
+ sel[new_size] = idx;
+ const auto* cell_value = reinterpret_cast<const void*>(block->cell(idx).cell_ptr());
+ new_size += _specific_filter->find_olap_engine(cell_value);
+ }
+ }
+ *size = new_size;
+}
+
+class BloomFilterColumnPredicateFactory {
+public:
+ static ColumnPredicate* create_column_predicate(
+ uint32_t column_id, const std::shared_ptr<IBloomFilterFuncBase>& filter,
+ FieldType type);
};
} //namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 23ce803..daf5c2a 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -717,32 +717,14 @@ COMPARISON_PREDICATE_CONDITION_VALUE(gt, GreaterPredicate)
COMPARISON_PREDICATE_CONDITION_VALUE(ge, GreaterEqualPredicate)
ColumnPredicate* Reader::_parse_to_predicate(
- const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter) {
+ const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter) {
int32_t index = _tablet->field_index(bloom_filter.first);
if (index < 0) {
return nullptr;
}
const TabletColumn& column = _tablet->tablet_schema().column(index);
- // Because FE regards CHAR as VARCHAR and Date as Datetime during query planning,
- // but direct use of filter will result in incorrect results due to inconsistent data structures.
- // We need to convert to the data structure corresponding to the storage engine.
- std::shared_ptr<BloomFilterFuncBase> filter;
- switch (column.type()) {
- case OLAP_FIELD_TYPE_CHAR: {
- filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
- TYPE_CHAR));
- filter->light_copy(bloom_filter.second.get());
- return new BloomFilterColumnPredicate(index, filter);
- }
- case OLAP_FIELD_TYPE_DATE: {
- filter.reset(BloomFilterFuncBase::create_bloom_filter(bloom_filter.second->tracker(),
- TYPE_DATE));
- filter->light_copy(bloom_filter.second.get());
- return new BloomFilterColumnPredicate(index, filter);
- }
- default:
- return new BloomFilterColumnPredicate(index, bloom_filter.second);
- }
+ return BloomFilterColumnPredicateFactory::create_column_predicate(index, bloom_filter.second,
+ column.type());
}
ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool opposite) const {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 72d5588..2d62aab 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -70,7 +70,7 @@ struct ReaderParams {
std::vector<OlapTuple> end_key;
std::vector<TCondition> conditions;
- std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters;
+ std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>> bloom_filters;
// The ColumnData will be set when using Merger, eg Cumulative, BE.
std::vector<RowsetReaderSharedPtr> rs_readers;
@@ -154,7 +154,7 @@ private:
ColumnPredicate* _parse_to_predicate(const TCondition& condition, bool opposite = false) const;
ColumnPredicate* _parse_to_predicate(
- const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter);
+ const std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>& bloom_filter);
OLAPStatus _init_delete_condition(const ReaderParams& read_params);
diff --git a/be/src/olap/rowset/segment_v2/block_split_bloom_filter.cpp b/be/src/olap/rowset/segment_v2/block_split_bloom_filter.cpp
index 1fe4d64..da842d4 100644
--- a/be/src/olap/rowset/segment_v2/block_split_bloom_filter.cpp
+++ b/be/src/olap/rowset/segment_v2/block_split_bloom_filter.cpp
@@ -22,38 +22,38 @@
namespace doris {
namespace segment_v2 {
-const uint32_t BlockSplitBloomFilter::SALT[8] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
- 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
-
void BlockSplitBloomFilter::add_hash(uint64_t hash) {
// most significant 32 bit mod block size as block index(BTW:block size is
// power of 2)
DCHECK(_num_bytes >= BYTES_PER_BLOCK);
- uint32_t block_size = _num_bytes / BYTES_PER_BLOCK;
- uint32_t block_index = (uint32_t)(hash >> 32) & (block_size - 1);
- uint32_t key = (uint32_t)hash;
+ const uint32_t bucket_index =
+ static_cast<uint32_t>(hash >> 32) & (_num_bytes / BYTES_PER_BLOCK - 1);
+ uint32_t key = static_cast<uint32_t>(hash);
+ uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
- // Calculate masks for bucket.
- uint32_t masks[8];
- _set_masks(key, masks);
- uint32_t* block_offset = (uint32_t*)(_data + BYTES_PER_BLOCK * block_index);
- for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
- *(block_offset + i) |= masks[i];
+ // Calculate mask for bucket.
+ BlockMask block_mask;
+ _set_masks(key, block_mask);
+
+ for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+ bitset32[bucket_index * BITS_SET_PER_BLOCK + i] |= block_mask.item[i];
}
}
bool BlockSplitBloomFilter::test_hash(uint64_t hash) const {
// most significant 32 bit mod block size as block index(BTW:block size is
// power of 2)
- uint32_t block_size = _num_bytes / BYTES_PER_BLOCK;
- uint32_t block_index = (uint32_t)(hash >> 32) & (block_size - 1);
- uint32_t key = (uint32_t)hash;
+ const uint32_t bucket_index =
+ static_cast<uint32_t>((hash >> 32) & (_num_bytes / BYTES_PER_BLOCK - 1));
+ uint32_t key = static_cast<uint32_t>(hash);
+ uint32_t* bitset32 = reinterpret_cast<uint32_t*>(_data);
+
// Calculate masks for bucket.
- uint32_t masks[BITS_SET_PER_BLOCK];
- _set_masks(key, masks);
- uint32_t* block_offset = (uint32_t*)(_data + BYTES_PER_BLOCK * block_index);
+ BlockMask block_mask;
+ _set_masks(key, block_mask);
+
for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
- if ((*(block_offset + i) & masks[i]) == 0) {
+ if (0 == (bitset32[BITS_SET_PER_BLOCK * bucket_index + i] & block_mask.item[i])) {
return false;
}
}
diff --git a/be/src/olap/rowset/segment_v2/block_split_bloom_filter.h b/be/src/olap/rowset/segment_v2/block_split_bloom_filter.h
index 8cc1db3..9d082f7 100644
--- a/be/src/olap/rowset/segment_v2/block_split_bloom_filter.h
+++ b/be/src/olap/rowset/segment_v2/block_split_bloom_filter.h
@@ -34,25 +34,33 @@ public:
bool test_hash(uint64_t hash) const override;
private:
- void _set_masks(uint32_t key, uint32_t* masks) const {
- for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
- // add some salt to key
- masks[i] = key * SALT[i];
- // masks[i] mod 32
- masks[i] = masks[i] >> 27;
- // set the masks[i]-th bit
- masks[i] = 0x1 << masks[i];
- }
- }
+ // Bytes in a tiny Bloom filter block.
+ static constexpr int BYTES_PER_BLOCK = 32;
+ // The number of bits to set in a tiny Bloom filter block
+ static constexpr int BITS_SET_PER_BLOCK = 8;
+
+ static constexpr uint32_t SALT[BITS_SET_PER_BLOCK] = {0x47b6137bU, 0x44974d91U, 0x8824ad5bU,
+ 0xa2b7289dU, 0x705495c7U, 0x2df1424bU,
+ 0x9efc4947U, 0x5c6bfb31U};
+
+ struct BlockMask {
+ uint32_t item[BITS_SET_PER_BLOCK];
+ };
private:
- // Bytes in a tiny Bloom filter block.
- static const uint32_t BYTES_PER_BLOCK = 32;
+ void _set_masks(uint32_t key, BlockMask& block_mask) const {
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ block_mask.item[i] = key * SALT[i];
+ }
- // The number of bits to set in a tiny Bloom filter block
- static const int BITS_SET_PER_BLOCK = 8;
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ block_mask.item[i] = block_mask.item[i] >> 27;
+ }
- static const uint32_t SALT[BITS_SET_PER_BLOCK];
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ block_mask.item[i] = uint32_t(0x1) << block_mask.item[i];
+ }
+ }
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter.h b/be/src/olap/rowset/segment_v2/bloom_filter.h
index 33726aa..4c015f0 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter.h
+++ b/be/src/olap/rowset/segment_v2/bloom_filter.h
@@ -102,13 +102,13 @@ public:
void reset() { memset(_data, 0, _size); }
- uint64_t hash(char* buf, uint32_t size) const {
+ uint64_t hash(const char* buf, uint32_t size) const {
uint64_t hash_code;
_hash_func(buf, size, DEFAULT_SEED, &hash_code);
return hash_code;
}
- void add_bytes(char* buf, uint32_t size) {
+ void add_bytes(const char* buf, uint32_t size) {
if (buf == nullptr) {
*_has_null = true;
return;
diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h
index a7c11fa..19b8eaf 100644
--- a/be/src/runtime/primitive_type.h
+++ b/be/src/runtime/primitive_type.h
@@ -243,6 +243,62 @@ std::string type_to_odbc_string(PrimitiveType t);
TTypeDesc gen_type_desc(const TPrimitiveType::type val);
TTypeDesc gen_type_desc(const TPrimitiveType::type val, const std::string& name);
+template <PrimitiveType type>
+struct PrimitiveTypeTraits {};
+
+template <>
+struct PrimitiveTypeTraits<TYPE_BOOLEAN> {
+ using CppType = bool;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_TINYINT> {
+ using CppType = int8_t;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_SMALLINT> {
+ using CppType = int16_t;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_INT> {
+ using CppType = int32_t;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_BIGINT> {
+ using CppType = int64_t;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_FLOAT> {
+ using CppType = float;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_DOUBLE> {
+ using CppType = double;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_DATE> {
+ using CppType = DateTimeValue;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_DATETIME> {
+ using CppType = DateTimeValue;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_DECIMALV2> {
+ using CppType = DecimalV2Value;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_LARGEINT> {
+ using CppType = __int128_t;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_CHAR> {
+ using CppType = StringValue;
+};
+template <>
+struct PrimitiveTypeTraits<TYPE_VARCHAR> {
+ using CppType = StringValue;
+};
+
} // namespace doris
#endif
diff --git a/be/test/exprs/bloom_filter_predicate_test.cpp b/be/test/exprs/bloom_filter_predicate_test.cpp
index ea54a90..8b0f1ab 100644
--- a/be/test/exprs/bloom_filter_predicate_test.cpp
+++ b/be/test/exprs/bloom_filter_predicate_test.cpp
@@ -31,9 +31,9 @@ public:
TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
auto tracker = MemTracker::CreateTracker();
- std::unique_ptr<BloomFilterFuncBase> func(
- BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_INT));
- ASSERT_TRUE(func->init().ok());
+ std::unique_ptr<IBloomFilterFuncBase> func(
+ IBloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_INT));
+ ASSERT_TRUE(func->init(1024, 0.05).ok());
const int data_size = 1024;
int data[data_size];
for (int i = 0; i < data_size; i++) {
@@ -50,9 +50,9 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_int_test) {
TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
auto tracker = MemTracker::CreateTracker();
- std::unique_ptr<BloomFilterFuncBase> func(
- BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_VARCHAR));
- ASSERT_TRUE(func->init().ok());
+ std::unique_ptr<IBloomFilterFuncBase> func(
+ IBloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_VARCHAR));
+ ASSERT_TRUE(func->init(1024, 0.05).ok());
ObjectPool obj_pool;
const int data_size = 1024;
StringValue data[data_size];
@@ -70,8 +70,8 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
ASSERT_FALSE(func->find((const void*)¬_exist_val));
// test fixed char
- func.reset(BloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_CHAR));
- ASSERT_TRUE(func->init().ok());
+ func.reset(IBloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_CHAR));
+ ASSERT_TRUE(func->init(1024, 0.05).ok());
auto varchar_true_str = obj_pool.add(new std::string("true"));
StringValue varchar_true(*varchar_true_str);
@@ -97,9 +97,21 @@ TEST_F(BloomFilterPredicateTest, bloom_filter_func_stringval_test) {
ASSERT_TRUE(func->find_olap_engine((const void*)&fixed_char_false));
}
+TEST_F(BloomFilterPredicateTest, bloom_filter_size_test) {
+ auto tracker = MemTracker::CreateTracker();
+ std::unique_ptr<IBloomFilterFuncBase> func(
+ IBloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_VARCHAR));
+ int length = 4096;
+ func->init_with_fixed_length(4096);
+ char* data = nullptr;
+ int len;
+ func->get_data(&data, &len);
+ ASSERT_EQ(length, len);
+}
+
} // namespace doris
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
-}
\ No newline at end of file
+}
diff --git a/be/test/olap/bloom_filter_column_predicate_test.cpp b/be/test/olap/bloom_filter_column_predicate_test.cpp
index feafafe..a62db54 100644
--- a/be/test/olap/bloom_filter_column_predicate_test.cpp
+++ b/be/test/olap/bloom_filter_column_predicate_test.cpp
@@ -90,16 +90,18 @@ TEST_F(TestBloomFilterColumnPredicate, FLOAT_COLUMN) {
}
auto tracker = MemTracker::CreateTracker(-1, "OlapScanner");
- std::shared_ptr<BloomFilterFuncBase> bloom_filter =
- std::make_shared<BloomFilterFunc<float>>(_mem_tracker.get());
- bloom_filter->init();
+ std::shared_ptr<IBloomFilterFuncBase> bloom_filter(
+ IBloomFilterFuncBase::create_bloom_filter(tracker.get(), PrimitiveType::TYPE_FLOAT));
+
+ bloom_filter->init(4096, 0.05);
float value = 4.1;
bloom_filter->insert(reinterpret_cast<void*>(&value));
value = 5.1;
bloom_filter->insert(reinterpret_cast<void*>(&value));
value = 6.1;
bloom_filter->insert(reinterpret_cast<void*>(&value));
- ColumnPredicate* pred = new BloomFilterColumnPredicate(0, bloom_filter);
+ ColumnPredicate* pred = BloomFilterColumnPredicateFactory::create_column_predicate(
+ 0, bloom_filter, OLAP_FIELD_TYPE_FLOAT);
// for VectorizedBatch no null
InitVectorizedBatch(&tablet_schema, return_columns, size);
diff --git a/build.sh b/build.sh
index b534887..16c248c 100755
--- a/build.sh
+++ b/build.sh
@@ -135,6 +135,9 @@ fi
if [[ -z ${GLIBC_COMPATIBILITY} ]]; then
GLIBC_COMPATIBILITY=ON
fi
+if [[ -z ${USE_AVX2} ]]; then
+ USE_AVX2=ON
+fi
if [[ -z ${WITH_LZO} ]]; then
WITH_LZO=OFF
fi
@@ -149,6 +152,7 @@ echo "Get params:
WITH_MYSQL -- $WITH_MYSQL
WITH_LZO -- $WITH_LZO
GLIBC_COMPATIBILITY -- $GLIBC_COMPATIBILITY
+ USE_AVX2 -- $USE_AVX2
"
# Clean and build generated code
@@ -180,6 +184,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
${CMAKE_USE_CCACHE} \
-DWITH_MYSQL=${WITH_MYSQL} \
-DWITH_LZO=${WITH_LZO} \
+ -DUSE_AVX2=${USE_AVX2} \
-DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY} ../
${BUILD_SYSTEM} -j ${PARALLEL}
${BUILD_SYSTEM} install
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org