You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/10/26 16:02:14 UTC

[doris] branch master updated: [Improvement](runtime filter) Reduce merging time for bloom filter (#13668)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0134e9d2f4 [Improvement](runtime filter) Reduce merging time for bloom filter (#13668)
0134e9d2f4 is described below

commit 0134e9d2f4010b2f540d53b593f34733160bb19d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Oct 27 00:02:05 2022 +0800

    [Improvement](runtime filter) Reduce merging time for bloom filter (#13668)
---
 be/src/exprs/block_bloom_filter.hpp         | 17 +++++++++++------
 be/src/exprs/block_bloom_filter_avx_impl.cc |  1 +
 be/src/exprs/block_bloom_filter_impl.cc     | 27 +++++++++++++++++++--------
 be/src/exprs/bloomfilter_predicate.h        | 16 ++++++++++------
 be/src/exprs/runtime_filter.cpp             |  2 +-
 be/src/exprs/runtime_filter.h               | 14 ++++++++++++--
 be/src/runtime/fragment_mgr.cpp             |  8 +++++---
 be/src/runtime/fragment_mgr.h               | 10 ++++++++--
 be/src/runtime/runtime_filter_mgr.cpp       | 15 +++++----------
 be/src/runtime/runtime_filter_mgr.h         | 10 ++++++++--
 be/src/service/internal_service.cpp         | 11 +++++++----
 11 files changed, 87 insertions(+), 44 deletions(-)

diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp
index b75350b56e..2dd6e3cb4f 100644
--- a/be/src/exprs/block_bloom_filter.hpp
+++ b/be/src/exprs/block_bloom_filter.hpp
@@ -21,10 +21,14 @@
 #pragma once
 
 #include "common/status.h"
-#include "gutil/macros.h"
+#include "fmt/format.h"
 #include "util/hash_util.hpp"
 #include "util/slice.h"
 
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
 namespace doris {
 
 // https://github.com/apache/kudu/blob/master/src/kudu/util/block_bloom_filter.h
@@ -40,11 +44,14 @@ public:
     explicit BlockBloomFilter();
     ~BlockBloomFilter();
 
+    BlockBloomFilter(const BlockBloomFilter&) = delete;
+    BlockBloomFilter& operator=(const BlockBloomFilter&) = delete;
+
     Status init(int log_space_bytes, uint32_t hash_seed);
     // Initialize the BlockBloomFilter from a populated "directory" structure.
     // Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message.
-    Status init_from_directory(int log_space_bytes, const Slice& directory, bool always_false,
-                               uint32_t hash_seed);
+    Status init_from_directory(int log_space_bytes, butil::IOBufAsZeroCopyInputStream* data,
+                               const size_t data_size, bool always_false, uint32_t hash_seed);
 
     void close();
 
@@ -176,7 +183,7 @@ private:
 
 #endif
     // Size of the internal directory structure in bytes.
-    int64_t directory_size() const { return 1ULL << log_space_bytes(); }
+    size_t directory_size() const { return 1ULL << log_space_bytes(); }
 
     // Some constants used in hashing. #defined for efficiency reasons.
 #define BLOOM_HASH_CONSTANTS                                                                   \
@@ -200,8 +207,6 @@ private:
         // Rehash32to32(hash2) is minimal.
         return (static_cast<uint64_t>(hash) * m + a) >> 32U;
     }
-
-    DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter);
 };
 
 } // namespace doris
diff --git a/be/src/exprs/block_bloom_filter_avx_impl.cc b/be/src/exprs/block_bloom_filter_avx_impl.cc
index e005b4e9c6..b6512f9848 100644
--- a/be/src/exprs/block_bloom_filter_avx_impl.cc
+++ b/be/src/exprs/block_bloom_filter_avx_impl.cc
@@ -23,6 +23,7 @@
 #include <immintrin.h>
 
 #include "exprs/block_bloom_filter.hpp"
+#include "gutil/macros.h"
 
 namespace doris {
 static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i make_mark(
diff --git a/be/src/exprs/block_bloom_filter_impl.cc b/be/src/exprs/block_bloom_filter_impl.cc
index e553694c5a..b619b239a4 100644
--- a/be/src/exprs/block_bloom_filter_impl.cc
+++ b/be/src/exprs/block_bloom_filter_impl.cc
@@ -26,6 +26,8 @@
 #include <mm_malloc.h>
 #endif
 
+#include <butil/iobuf.h>
+
 #include <algorithm>
 #include <climits>
 #include <cmath>
@@ -87,17 +89,26 @@ Status BlockBloomFilter::init(const int log_space_bytes, uint32_t hash_seed) {
     return Status::OK();
 }
 
-Status BlockBloomFilter::init_from_directory(int log_space_bytes, const Slice& directory,
-                                             bool always_false, uint32_t hash_seed) {
+Status BlockBloomFilter::init_from_directory(int log_space_bytes,
+                                             butil::IOBufAsZeroCopyInputStream* data,
+                                             const size_t data_size, bool always_false,
+                                             uint32_t hash_seed) {
     RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed));
     DCHECK(_directory);
 
-    if (directory_size() != directory.size) {
-        return Status::InvalidArgument(
+    if (directory_size() != data_size) {
+        return Status::InvalidArgument(fmt::format(
                 "Mismatch in BlockBloomFilter source directory size {} and expected size {}",
-                directory.size, directory_size());
+                data_size, directory_size()));
+    }
+    int size = 0;
+    char* tmp;
+    const void** ptr = (const void**)&tmp;
+    char* data_ptr = reinterpret_cast<char*>(_directory);
+    while (data->Next(ptr, &size)) {
+        memcpy(data_ptr, *ptr, size);
+        data_ptr += size;
     }
-    memcpy(_directory, directory.data, directory.size);
     _always_false = always_false;
     return Status::OK();
 }
@@ -240,8 +251,8 @@ Status BlockBloomFilter::merge(const BlockBloomFilter& other) {
         return Status::OK();
     }
 
-    or_equal_array_internal(directory_size(), reinterpret_cast<const uint8*>(other._directory),
-                            reinterpret_cast<uint8*>(_directory));
+    or_equal_array_internal(directory_size(), reinterpret_cast<const uint8_t*>(other._directory),
+                            reinterpret_cast<uint8_t*>(_directory));
 
     _always_false = false;
     return Status::OK();
diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 5ce3ce47d5..545c40cc5b 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -33,6 +33,10 @@
 #include "olap/uint24.h"
 #include "util/hash_util.hpp"
 
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
 namespace doris {
 class BloomFilterAdaptor {
 public:
@@ -50,9 +54,9 @@ public:
         return _bloom_filter->init(log_space, /*hash_seed*/ 0);
     }
 
-    Status init(const char* data, int len) {
-        int log_space = log2(len);
-        return _bloom_filter->init_from_directory(log_space, Slice(data, len), false, 0);
+    Status init(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
+        int log_space = log2(data_size);
+        return _bloom_filter->init_from_directory(log_space, data, data_size, false, 0);
     }
 
     char* data() { return (char*)_bloom_filter->directory().data; }
@@ -161,13 +165,13 @@ public:
         }
     }
 
-    Status assign(const char* data, int len) {
+    Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) {
         if (_bloom_filter == nullptr) {
             _bloom_filter.reset(BloomFilterAdaptor::create());
         }
 
-        _bloom_filter_alloced = len;
-        return _bloom_filter->init(data, len);
+        _bloom_filter_alloced = data_size;
+        return _bloom_filter->init(data, data_size);
     }
 
     Status get_data(char** data, int* len) {
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1b5ca935e5..77a476d095 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -862,7 +862,7 @@ public:
 
     // used by shuffle runtime filter
     // assign this filter by protobuf
-    Status assign(const PBloomFilter* bloom_filter, const char* data) {
+    Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data) {
         _is_bloomfilter = true;
         // we won't use this class to insert or find any data
         // so any type is ok
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 677321ea4d..42584d64f9 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -27,6 +27,10 @@
 #include "util/time.h"
 #include "util/uid_util.h"
 
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
 namespace doris {
 class Predicate;
 class ObjectPool;
@@ -97,14 +101,20 @@ struct RuntimeFilterParams {
 };
 
 struct UpdateRuntimeFilterParams {
+    UpdateRuntimeFilterParams(const PPublishFilterRequest* req,
+                              butil::IOBufAsZeroCopyInputStream* data_stream, ObjectPool* obj_pool)
+            : request(req), data(data_stream), pool(obj_pool) {}
     const PPublishFilterRequest* request;
-    const char* data;
+    butil::IOBufAsZeroCopyInputStream* data;
     ObjectPool* pool;
 };
 
 struct MergeRuntimeFilterParams {
+    MergeRuntimeFilterParams(const PMergeFilterRequest* req,
+                             butil::IOBufAsZeroCopyInputStream* data_stream)
+            : request(req), data(data_stream) {}
     const PMergeFilterRequest* request;
-    const char* data;
+    butil::IOBufAsZeroCopyInputStream* data;
 };
 
 /// The runtimefilter is built in the join node.
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b5c3af432..511979d2a8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -921,7 +921,8 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
     return exec_plan_fragment(exec_fragment_params);
 }
 
-Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) {
+Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
+                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
     UniqueId fragment_instance_id = request->fragment_id();
     TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
     std::shared_ptr<FragmentExecState> fragment_state;
@@ -946,11 +947,12 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha
     RuntimeFilterMgr* runtime_filter_mgr =
             fragment_state->executor()->runtime_state()->runtime_filter_mgr();
 
-    Status st = runtime_filter_mgr->update_filter(request, data);
+    Status st = runtime_filter_mgr->update_filter(request, attach_data);
     return st;
 }
 
-Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) {
+Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
+                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
     UniqueId queryid = request->query_id();
     std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
     RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index fd54afacd2..08be2edc0c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -36,6 +36,10 @@
 #include "util/metrics.h"
 #include "util/thread.h"
 
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
 namespace doris {
 
 class QueryFragmentsCtx;
@@ -85,9 +89,11 @@ public:
                                        const TUniqueId& fragment_instance_id,
                                        std::vector<TScanColumnDesc>* selected_columns);
 
-    Status apply_filter(const PPublishFilterRequest* request, const char* attach_data);
+    Status apply_filter(const PPublishFilterRequest* request,
+                        butil::IOBufAsZeroCopyInputStream* attach_data);
 
-    Status merge_filter(const PMergeFilterRequest* request, const char* attach_data);
+    Status merge_filter(const PMergeFilterRequest* request,
+                        butil::IOBufAsZeroCopyInputStream* attach_data);
 
     void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe);
 
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 5175ed580c..0ed812737f 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -110,13 +110,10 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
 
     return Status::OK();
 }
-
-Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
+                                       butil::IOBufAsZeroCopyInputStream* data) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
-    UpdateRuntimeFilterParams params;
-    params.request = request;
-    params.data = data;
-    params.pool = &_pool;
+    UpdateRuntimeFilterParams params(request, data, &_pool);
     int filter_id = request->filter_id();
     IRuntimeFilter* real_filter = nullptr;
     RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter));
@@ -185,7 +182,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
 
 // merge data
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
-                                                 const char* data) {
+                                                 butil::IOBufAsZeroCopyInputStream* attach_data) {
     // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
@@ -201,9 +198,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
         if (auto bf = cntVal->filter->get_bloomfilter()) {
             RETURN_IF_ERROR(bf->init_with_fixed_length());
         }
-        MergeRuntimeFilterParams params;
-        params.data = data;
-        params.request = request;
+        MergeRuntimeFilterParams params(request, attach_data);
         ObjectPool* pool = iter->second->pool.get();
         RuntimeFilterWrapperHolder holder;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, pool, holder.getHandle()));
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 0db00f2331..a3b3add74b 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -32,6 +32,10 @@
 #include "util/time.h"
 #include "util/uid_util.h"
 
+namespace butil {
+class IOBufAsZeroCopyInputStream;
+}
+
 namespace doris {
 class TUniqueId;
 class RuntimeFilter;
@@ -69,7 +73,8 @@ public:
                          const TQueryOptions& options, int node_id = -1);
 
     // update filter by remote
-    Status update_filter(const PPublishFilterRequest* request, const char* data);
+    Status update_filter(const PPublishFilterRequest* request,
+                         butil::IOBufAsZeroCopyInputStream* data);
 
     void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
 
@@ -113,7 +118,8 @@ public:
                 const TQueryOptions& query_options);
 
     // handle merge rpc
-    Status merge(const PMergeFilterRequest* request, const char* data);
+    Status merge(const PMergeFilterRequest* request,
+                 butil::IOBufAsZeroCopyInputStream* attach_data);
 
     UniqueId query_id() { return _query_id; }
 
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 0e7c1d1002..9d0c4d0383 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -17,6 +17,8 @@
 
 #include "service/internal_service.h"
 
+#include <butil/iobuf.h>
+
 #include <string>
 
 #include "common/config.h"
@@ -493,8 +495,9 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
                                         ::doris::PMergeFilterResponse* response,
                                         ::google::protobuf::Closure* done) {
     brpc::ClosureGuard closure_guard(done);
-    auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
-    Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
+    auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+    Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream);
     if (!st.ok()) {
         LOG(WARNING) << "merge meet error" << st.to_string();
     }
@@ -507,10 +510,10 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
                                         ::google::protobuf::Closure* done) {
     brpc::ClosureGuard closure_guard(done);
     auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
+    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
     UniqueId unique_id(request->query_id());
-    // TODO: avoid copy attachment copy
     VLOG_NOTICE << "rpc apply_filter recv";
-    Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data());
+    Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream);
     if (!st.ok()) {
         LOG(WARNING) << "apply filter meet error: " << st.to_string();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org