You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/10/26 16:02:14 UTC
[doris] branch master updated: [Improvement](runtime filter) Reduce merging time for bloom filter (#13668)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0134e9d2f4 [Improvement](runtime filter) Reduce merging time for bloom filter (#13668)
0134e9d2f4 is described below
commit 0134e9d2f4010b2f540d53b593f34733160bb19d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 27 00:02:05 2022 +0800
[Improvement](runtime filter) Reduce merging time for bloom filter (#13668)
---
be/src/exprs/block_bloom_filter.hpp | 17 +++++++++++------
be/src/exprs/block_bloom_filter_avx_impl.cc | 1 +
be/src/exprs/block_bloom_filter_impl.cc | 27 +++++++++++++++++++--------
be/src/exprs/bloomfilter_predicate.h | 16 ++++++++++------
be/src/exprs/runtime_filter.cpp | 2 +-
be/src/exprs/runtime_filter.h | 14 ++++++++++++--
be/src/runtime/fragment_mgr.cpp | 8 +++++---
be/src/runtime/fragment_mgr.h | 10 ++++++++--
be/src/runtime/runtime_filter_mgr.cpp | 15 +++++----------
be/src/runtime/runtime_filter_mgr.h | 10 ++++++++--
be/src/service/internal_service.cpp | 11 +++++++----
11 files changed, 87 insertions(+), 44 deletions(-)
diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp
index b75350b56e..2dd6e3cb4f 100644
--- a/be/src/exprs/block_bloom_filter.hpp
+++ b/be/src/exprs/block_bloom_filter.hpp
@@ -21,10 +21,14 @@
#pragma once
#include "common/status.h"
-#include "gutil/macros.h"
+#include "fmt/format.h"
#include "util/hash_util.hpp"
#include "util/slice.h"
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
namespace doris {
// https://github.com/apache/kudu/blob/master/src/kudu/util/block_bloom_filter.h
@@ -40,11 +44,14 @@ public:
explicit BlockBloomFilter();
~BlockBloomFilter();
+ BlockBloomFilter(const BlockBloomFilter&) = delete;
+ BlockBloomFilter& operator=(const BlockBloomFilter&) = delete;
+
Status init(int log_space_bytes, uint32_t hash_seed);
// Initialize the BlockBloomFilter from a populated "directory" structure.
// Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message.
- Status init_from_directory(int log_space_bytes, const Slice& directory, bool always_false,
- uint32_t hash_seed);
+ Status init_from_directory(int log_space_bytes, butil::IOBufAsZeroCopyInputStream* data,
+ const size_t data_size, bool always_false, uint32_t hash_seed);
void close();
@@ -176,7 +183,7 @@ private:
#endif
// Size of the internal directory structure in bytes.
- int64_t directory_size() const { return 1ULL << log_space_bytes(); }
+ size_t directory_size() const { return 1ULL << log_space_bytes(); }
// Some constants used in hashing. #defined for efficiency reasons.
#define BLOOM_HASH_CONSTANTS \
@@ -200,8 +207,6 @@ private:
// Rehash32to32(hash2) is minimal.
return (static_cast<uint64_t>(hash) * m + a) >> 32U;
}
-
- DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter);
};
} // namespace doris
diff --git a/be/src/exprs/block_bloom_filter_avx_impl.cc b/be/src/exprs/block_bloom_filter_avx_impl.cc
index e005b4e9c6..b6512f9848 100644
--- a/be/src/exprs/block_bloom_filter_avx_impl.cc
+++ b/be/src/exprs/block_bloom_filter_avx_impl.cc
@@ -23,6 +23,7 @@
#include <immintrin.h>
#include "exprs/block_bloom_filter.hpp"
+#include "gutil/macros.h"
namespace doris {
static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i make_mark(
diff --git a/be/src/exprs/block_bloom_filter_impl.cc b/be/src/exprs/block_bloom_filter_impl.cc
index e553694c5a..b619b239a4 100644
--- a/be/src/exprs/block_bloom_filter_impl.cc
+++ b/be/src/exprs/block_bloom_filter_impl.cc
@@ -26,6 +26,8 @@
#include <mm_malloc.h>
#endif
+#include <butil/iobuf.h>
+
#include <algorithm>
#include <climits>
#include <cmath>
@@ -87,17 +89,26 @@ Status BlockBloomFilter::init(const int log_space_bytes, uint32_t hash_seed) {
return Status::OK();
}
-Status BlockBloomFilter::init_from_directory(int log_space_bytes, const Slice& directory,
- bool always_false, uint32_t hash_seed) {
+Status BlockBloomFilter::init_from_directory(int log_space_bytes,
+ butil::IOBufAsZeroCopyInputStream* data,
+ const size_t data_size, bool always_false,
+ uint32_t hash_seed) {
RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed));
DCHECK(_directory);
- if (directory_size() != directory.size) {
- return Status::InvalidArgument(
+ if (directory_size() != data_size) {
+ return Status::InvalidArgument(fmt::format(
"Mismatch in BlockBloomFilter source directory size {} and expected size {}",
- directory.size, directory_size());
+ data_size, directory_size()));
+ }
+ int size = 0;
+ char* tmp;
+ const void** ptr = (const void**)&tmp;
+ char* data_ptr = reinterpret_cast<char*>(_directory);
+ while (data->Next(ptr, &size)) {
+ memcpy(data_ptr, *ptr, size);
+ data_ptr += size;
}
- memcpy(_directory, directory.data, directory.size);
_always_false = always_false;
return Status::OK();
}
@@ -240,8 +251,8 @@ Status BlockBloomFilter::merge(const BlockBloomFilter& other) {
return Status::OK();
}
- or_equal_array_internal(directory_size(), reinterpret_cast<const uint8*>(other._directory),
- reinterpret_cast<uint8*>(_directory));
+ or_equal_array_internal(directory_size(), reinterpret_cast<const uint8_t*>(other._directory),
+ reinterpret_cast<uint8_t*>(_directory));
_always_false = false;
return Status::OK();
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 5ce3ce47d5..545c40cc5b 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -33,6 +33,10 @@
#include "olap/uint24.h"
#include "util/hash_util.hpp"
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
namespace doris {
class BloomFilterAdaptor {
public:
@@ -50,9 +54,9 @@ public:
return _bloom_filter->init(log_space, /*hash_seed*/ 0);
}
- Status init(const char* data, int len) {
- int log_space = log2(len);
- return _bloom_filter->init_from_directory(log_space, Slice(data, len), false, 0);
+ Status init(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
+ int log_space = log2(data_size);
+ return _bloom_filter->init_from_directory(log_space, data, data_size, false, 0);
}
char* data() { return (char*)_bloom_filter->directory().data; }
@@ -161,13 +165,13 @@ public:
}
}
- Status assign(const char* data, int len) {
+ Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
if (_bloom_filter == nullptr) {
_bloom_filter.reset(BloomFilterAdaptor::create());
}
- _bloom_filter_alloced = len;
- return _bloom_filter->init(data, len);
+ _bloom_filter_alloced = data_size;
+ return _bloom_filter->init(data, data_size);
}
Status get_data(char** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1b5ca935e5..77a476d095 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -862,7 +862,7 @@ public:
// used by shuffle runtime filter
// assign this filter by protobuf
- Status assign(const PBloomFilter* bloom_filter, const char* data) {
+ Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data) {
_is_bloomfilter = true;
// we won't use this class to insert or find any data
// so any type is ok
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 677321ea4d..42584d64f9 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -27,6 +27,10 @@
#include "util/time.h"
#include "util/uid_util.h"
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
namespace doris {
class Predicate;
class ObjectPool;
@@ -97,14 +101,20 @@ struct RuntimeFilterParams {
};
struct UpdateRuntimeFilterParams {
+ UpdateRuntimeFilterParams(const PPublishFilterRequest* req,
+ butil::IOBufAsZeroCopyInputStream* data_stream, ObjectPool* obj_pool)
+ : request(req), data(data_stream), pool(obj_pool) {}
const PPublishFilterRequest* request;
- const char* data;
+ butil::IOBufAsZeroCopyInputStream* data;
ObjectPool* pool;
};
struct MergeRuntimeFilterParams {
+ MergeRuntimeFilterParams(const PMergeFilterRequest* req,
+ butil::IOBufAsZeroCopyInputStream* data_stream)
+ : request(req), data(data_stream) {}
const PMergeFilterRequest* request;
- const char* data;
+ butil::IOBufAsZeroCopyInputStream* data;
};
/// The runtimefilter is built in the join node.
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b5c3af432..511979d2a8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -921,7 +921,8 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
return exec_plan_fragment(exec_fragment_params);
}
-Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
+Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId fragment_instance_id = request->fragment_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
@@ -946,11 +947,12 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha
RuntimeFilterMgr* runtime_filter_mgr =
fragment_state->executor()->runtime_state()->runtime_filter_mgr();
- Status st = runtime_filter_mgr->update_filter(request, data);
+ Status st = runtime_filter_mgr->update_filter(request, attach_data);
return st;
}
-Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
+Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index fd54afacd2..08be2edc0c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -36,6 +36,10 @@
#include "util/metrics.h"
#include "util/thread.h"
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
namespace doris {
class QueryFragmentsCtx;
@@ -85,9 +89,11 @@ public:
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns);
- Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
+ Status apply_filter(const PPublishFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data);
- Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
+ Status merge_filter(const PMergeFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data);
void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 5175ed580c..0ed812737f 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -110,13 +110,10 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
return Status::OK();
}
-
-Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* data) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
- UpdateRuntimeFilterParams params;
- params.request = request;
- params.data = data;
- params.pool = &_pool;
+ UpdateRuntimeFilterParams params(request, data, &_pool);
int filter_id = request->filter_id();
IRuntimeFilter* real_filter = nullptr;
RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
@@ -185,7 +182,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
- const char* data) {
+ butil::IOBufAsZeroCopyInputStream* attach_data) {
// SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
@@ -201,9 +198,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
if (auto bf = cntVal->filter->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
- MergeRuntimeFilterParams params;
- params.data = data;
- params.request = request;
+ MergeRuntimeFilterParams params(request, attach_data);
ObjectPool* pool = iter->second->pool.get();
RuntimeFilterWrapperHolder holder;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, pool, holder.getHandle()));
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 0db00f2331..a3b3add74b 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -32,6 +32,10 @@
#include "util/time.h"
#include "util/uid_util.h"
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
namespace doris {
class TUniqueId;
class RuntimeFilter;
@@ -69,7 +73,8 @@ public:
const TQueryOptions& options, int node_id = -1);
// update filter by remote
- Status update_filter(const PPublishFilterRequest* request, const char* data);
+ Status update_filter(const PPublishFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* data);
void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
@@ -113,7 +118,8 @@ public:
const TQueryOptions& query_options);
// handle merge rpc
- Status merge(const PMergeFilterRequest* request, const char* data);
+ Status merge(const PMergeFilterRequest* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data);
UniqueId query_id() { return _query_id; }
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 0e7c1d1002..9d0c4d0383 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -17,6 +17,8 @@
#include "service/internal_service.h"
+#include <butil/iobuf.h>
+
#include <string>
#include "common/config.h"
@@ -493,8 +495,9 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
::doris::PMergeFilterResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
- auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
- Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
+ auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
if (!st.ok()) {
LOG(WARNING) << "merge meet error" << st.to_string();
}
@@ -507,10 +510,10 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
::google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
UniqueId unique_id(request->query_id());
- // TODO: avoid copy attachment copy
VLOG_NOTICE << "rpc apply_filter recv";
- Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
+ Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream);
if (!st.ok()) {
LOG(WARNING) << "apply filter meet error: " << st.to_string();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org