You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/06/13 03:20:56 UTC

[doris] branch master updated: [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e010fa8d4f [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)
e010fa8d4f is described below

commit e010fa8d4ff17e0800c9b71fd0b1701b7d205afb
Author: Pxl <px...@qq.com>
AuthorDate: Tue Jun 13 11:20:49 2023 +0800

    [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593)
---
 be/src/common/config.cpp                        |  4 ---
 be/src/common/config.h                          |  4 ---
 be/src/exprs/runtime_filter.cpp                 | 29 -------------------
 be/src/exprs/runtime_filter.h                   | 15 +++++-----
 be/src/exprs/runtime_filter_rpc.cpp             | 35 +++++++++++++----------
 be/src/exprs/runtime_filter_slots.h             | 37 ++++++++++++++-----------
 be/src/exprs/runtime_filter_slots_cross.h       | 10 +++----
 be/src/pipeline/exec/hashjoin_build_sink.h      |  1 +
 be/src/runtime/runtime_filter_mgr.cpp           |  8 +++---
 be/src/vec/exec/join/vhash_join_node.cpp        | 17 +++++++-----
 be/src/vec/exec/join/vhash_join_node.h          |  9 +++++-
 be/src/vec/exec/join/vnested_loop_join_node.cpp |  2 +-
 12 files changed, 77 insertions(+), 94 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f23bbffe99..f3ea211a8e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -732,10 +732,6 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
 // In most cases, it does not need to be modified.
 DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
 
-// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
-// else we will call sync method
-DEFINE_mBool(runtime_filter_use_async_rpc, "true");
-
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
 // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7b61e13c92..4f6a608929 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -768,10 +768,6 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
 // In most cases, it does not need to be modified.
 DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 
-// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
-// else we will call sync method
-DECLARE_mBool(runtime_filter_use_async_rpc);
-
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
 // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index ef8543e136..1a4b183e5d 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1019,26 +1019,6 @@ public:
 
     PrimitiveType column_type() { return _column_return_type; }
 
-    void ready_for_publish() {
-        if (_filter_type == RuntimeFilterType::MINMAX_FILTER) {
-            switch (_column_return_type) {
-            case TYPE_VARCHAR:
-            case TYPE_CHAR:
-            case TYPE_STRING: {
-                StringRef* min_value = static_cast<StringRef*>(_context.minmax_func->get_min());
-                StringRef* max_value = static_cast<StringRef*>(_context.minmax_func->get_max());
-                auto min_val_ptr = _pool->add(new std::string(min_value->data));
-                auto max_val_ptr = _pool->add(new std::string(max_value->data));
-                StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length());
-                StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length());
-                _context.minmax_func->assign(&min_val, &max_val);
-            }
-            default:
-                break;
-            }
-        }
-    }
-
     bool is_bloomfilter() const { return _is_bloomfilter; }
 
     bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
@@ -1164,11 +1144,6 @@ Status IRuntimeFilter::publish() {
     }
 }
 
-void IRuntimeFilter::publish_finally() {
-    DCHECK(is_producer());
-    join_rpc();
-}
-
 Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) {
     DCHECK(is_consumer());
     if (!_is_ignored) {
@@ -1531,10 +1506,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
     }
 }
 
-void IRuntimeFilter::ready_for_publish() {
-    _wrapper->ready_for_publish();
-}
-
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
     if (!_is_ignored && wrapper->is_ignored_in_filter()) {
         set_ignored();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 8389e8416b..6ff64a9af3 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -217,8 +217,6 @@ public:
     // push filter to remote node or push down it to scan_node
     Status publish();
 
-    void publish_finally();
-
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
     Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs);
@@ -293,9 +291,12 @@ public:
     // consumer should call before released
     Status consumer_close();
 
+    bool is_finish_rpc();
+
+    Status join_rpc();
+
     // async push runtimefilter to remote node
     Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf);
-    Status join_rpc();
 
     void init_profile(RuntimeProfile* parent_profile);
 
@@ -303,8 +304,6 @@ public:
 
     void update_runtime_filter_type_to_profile();
 
-    void ready_for_publish();
-
     static bool enable_use_batch(bool use_batch, PrimitiveType type) {
         return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
     }
@@ -389,9 +388,9 @@ protected:
 
     std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs;
 
-    struct rpc_context;
+    struct RPCContext;
 
-    std::shared_ptr<rpc_context> _rpc_context;
+    std::shared_ptr<RPCContext> _rpc_context;
 
     // parent profile
     // only effect on consumer
@@ -403,7 +402,7 @@ protected:
     const bool _enable_pipeline_exec;
 
     bool _profile_init = false;
-    doris::Mutex _profile_mutex;
+    std::mutex _profile_mutex;
     std::string _name;
     bool _opt_remote_rf;
 };
diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp
index 829224f3f2..4d613e3a57 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -38,11 +38,14 @@
 
 namespace doris {
 
-struct IRuntimeFilter::rpc_context {
+struct IRuntimeFilter::RPCContext {
     PMergeFilterRequest request;
     PMergeFilterResponse response;
     brpc::Controller cntl;
     brpc::CallId cid;
+    bool is_finished = false;
+
+    static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; }
 };
 
 Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
@@ -54,10 +57,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
     if (!stub) {
         std::string msg =
                 fmt::format("Get rpc stub failed, host={},  port=", addr->hostname, addr->port);
-        LOG(WARNING) << msg;
         return Status::InternalError(msg);
     }
-    _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>();
+    _rpc_context = std::make_shared<IRuntimeFilter::RPCContext>();
     void* data = nullptr;
     int len = 0;
 
@@ -72,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
     _rpc_context->request.set_filter_id(_filter_id);
     _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
     _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
-    _rpc_context->cntl.set_timeout_ms(1000);
+    _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms());
     _rpc_context->cid = _rpc_context->cntl.call_id();
 
     Status serialize_status = serialize(&_rpc_context->request, &data, &len);
@@ -83,14 +85,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
             DCHECK(data != nullptr);
             _rpc_context->cntl.request_attachment().append(data, len);
         }
-        if (config::runtime_filter_use_async_rpc) {
-            stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
-                               brpc::DoNothing());
-        } else {
-            stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
-                               nullptr);
-            _rpc_context.reset();
-        }
+
+        stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response,
+                           brpc::NewCallback(RPCContext::finish, _rpc_context));
 
     } else {
         // we should reset context
@@ -99,15 +96,25 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress
     return serialize_status;
 }
 
+bool IRuntimeFilter::is_finish_rpc() {
+    if (_rpc_context == nullptr) {
+        return true;
+    }
+    return _rpc_context->is_finished;
+}
+
 Status IRuntimeFilter::join_rpc() {
-    DCHECK(is_producer());
+    if (!is_producer()) {
+        return Status::InternalError("RuntimeFilter::join_rpc only called when rf is producer.");
+    }
     if (_rpc_context != nullptr) {
         brpc::Join(_rpc_context->cid);
         if (_rpc_context->cntl.Failed()) {
-            LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText();
             // reset stub cache
             ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
                     _rpc_context->cntl.remote_side());
+            return Status::InternalError("RuntimeFilter::join_rpc meet rpc error, msg={}.",
+                                         _rpc_context->cntl.ErrorText());
         }
     }
     return Status::OK();
diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h
index b1f4b125b4..d6a5866cae 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "common/status.h"
 #include "exprs/runtime_filter.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
@@ -60,8 +61,8 @@ public:
         auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
             runtime_filter->set_ignored();
             runtime_filter->set_ignored_msg(msg);
-            runtime_filter->publish();
-            runtime_filter->publish_finally();
+            RETURN_IF_ERROR(runtime_filter->publish());
+            return Status::OK();
         };
 
         // ordered vector: IN, IN_OR_BLOOM, others.
@@ -142,9 +143,9 @@ public:
                         "in_num({}) >= max_in_num({})",
                         print_id(state->fragment_instance_id()), filter_desc.filter_id,
                         hash_table_size, max_in_num);
-                ignore_remote_filter(runtime_filter, msg);
+                RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
 #else
-                ignore_remote_filter(runtime_filter, "ignored");
+                RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, "ignored"));
 #endif
                 continue;
             }
@@ -196,29 +197,33 @@ public:
         }
     }
 
-    // should call this method after insert
-    void ready_for_publish() {
+    bool ready_finish_publish() {
         for (auto& pair : _runtime_filters) {
             for (auto filter : pair.second) {
-                filter->ready_for_publish();
+                if (!filter->is_finish_rpc()) {
+                    return false;
+                }
             }
         }
+        return true;
     }
-    // publish runtime filter
-    void publish() {
-        for (int i = 0; i < _probe_expr_context.size(); ++i) {
-            auto iter = _runtime_filters.find(i);
-            if (iter != _runtime_filters.end()) {
-                for (auto filter : iter->second) {
-                    filter->publish();
-                }
+
+    void finish_publish() {
+        for (auto& pair : _runtime_filters) {
+            for (auto filter : pair.second) {
+                filter->join_rpc();
             }
         }
+    }
+
+    // publish runtime filter
+    Status publish() {
         for (auto& pair : _runtime_filters) {
             for (auto filter : pair.second) {
-                filter->publish_finally();
+                RETURN_IF_ERROR(filter->publish());
             }
         }
+        return Status::OK();
     }
 
     void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h
index 1e8c15e713..184ba46591 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -19,6 +19,7 @@
 
 #include <vector>
 
+#include "common/status.h"
 #include "exprs/runtime_filter.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
@@ -88,14 +89,11 @@ public:
         return Status::OK();
     }
 
-    void publish() {
+    Status publish() {
         for (auto& filter : _runtime_filters) {
-            filter->publish();
-        }
-        for (auto& filter : _runtime_filters) {
-            // todo: cross join may not need publish_finally()
-            filter->publish_finally();
+            RETURN_IF_ERROR(filter->publish());
         }
+        return Status::OK();
     }
 
     bool empty() { return !_runtime_filters.size(); }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h
index dfa077b83f..c1e67512dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -39,6 +39,7 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
 public:
     HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return _node->can_sink_write(); }
+    bool is_pending_finish() const override { return !_node->ready_for_finish(); }
 };
 
 } // namespace pipeline
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index b6dfa0a625..a566018c11 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -43,7 +43,7 @@
 namespace doris {
 
 template <class RPCRequest, class RPCResponse>
-struct async_rpc_context {
+struct AsyncRPCContext {
     RPCRequest request;
     RPCResponse response;
     brpc::Controller cntl;
@@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
 RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx)
         : _query_ctx(query_ctx) {}
 
-RuntimeFilterMgr::~RuntimeFilterMgr() {}
+RuntimeFilterMgr::~RuntimeFilterMgr() = default;
 
 Status RuntimeFilterMgr::init() {
     _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
@@ -315,7 +315,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
             // 3. This filter is bloom filter (only bloom filter should be used for merging)
             using PPublishFilterRpcContext =
-                    async_rpc_context<PPublishFilterRequestV2, PPublishFilterResponse>;
+                    AsyncRPCContext<PPublishFilterRequestV2, PPublishFilterResponse>;
             std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
             rpc_contexts.reserve(cntVal->targetv2_info.size());
 
@@ -380,7 +380,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
         } else {
             // prepare rpc context
             using PPublishFilterRpcContext =
-                    async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>;
+                    AsyncRPCContext<PPublishFilterRequest, PPublishFilterResponse>;
             std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
             rpc_contexts.reserve(cntVal->target_info.size());
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 822b99db58..9b410f56ae 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -285,9 +285,9 @@ struct ProcessRuntimeFilterBuild {
         if (_join_node->_runtime_filter_descs.empty()) {
             return Status::OK();
         }
-        _join_node->_runtime_filter_slots = _join_node->_pool->add(
-                new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
-                                        _join_node->_runtime_filter_descs));
+        _join_node->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+                _join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
+                _join_node->_runtime_filter_descs);
 
         RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
                 state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality));
@@ -300,7 +300,7 @@ struct ProcessRuntimeFilterBuild {
         }
         {
             SCOPED_TIMER(_join_node->_push_down_timer);
-            _join_node->_runtime_filter_slots->publish();
+            RETURN_IF_ERROR(_join_node->_runtime_filter_slots->publish());
         }
 
         return Status::OK();
@@ -917,15 +917,15 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
                                   if (_runtime_filter_descs.empty()) {
                                       return Status::OK();
                                   }
-                                  _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots(
+                                  _runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
                                           _probe_expr_ctxs, _build_expr_ctxs,
-                                          _runtime_filter_descs));
+                                          _runtime_filter_descs);
 
                                   RETURN_IF_ERROR(_runtime_filter_slots->init(
                                           state, arg.hash_table.get_size(), 0));
                                   RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
                                           _shared_hash_table_context));
-                                  _runtime_filter_slots->publish();
+                                  RETURN_IF_ERROR(_runtime_filter_slots->publish());
                                   return Status::OK();
                               }},
                     *_hash_table_variants);
@@ -1261,6 +1261,9 @@ HashJoinNode::~HashJoinNode() {
         // signal at here is abnormal
         _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor"));
     }
+    if (_runtime_filter_slots != nullptr) {
+        _runtime_filter_slots->finish_publish();
+    }
 }
 
 void HashJoinNode::_release_mem() {
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 36398ba4ad..4e9cd40e9a 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -252,6 +252,13 @@ public:
 
     bool should_build_hash_table() const { return _should_build_hash_table; }
 
+    bool ready_for_finish() {
+        if (_runtime_filter_slots == nullptr) {
+            return true;
+        }
+        return _runtime_filter_slots->ready_finish_publish();
+    }
+
 private:
     void _init_short_circuit_for_probe() override {
         _short_circuit_for_probe =
@@ -341,7 +348,7 @@ private:
     bool _is_broadcast_join = false;
     bool _should_build_hash_table = true;
     std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = nullptr;
-    VRuntimeFilterSlots* _runtime_filter_slots = nullptr;
+    std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr;
 
     std::vector<SlotId> _hash_output_slot_ids;
     std::vector<bool> _left_output_slot_flags;
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 57ad81f120..491a9c6854 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -84,7 +84,7 @@ struct RuntimeFilterBuild {
         }
         {
             SCOPED_TIMER(_join_node->_push_down_timer);
-            runtime_filter_slots.publish();
+            RETURN_IF_ERROR(runtime_filter_slots.publish());
         }
 
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org