You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/14 00:16:33 UTC

[incubator-doris] branch master updated: [improvement] send merged runtime filter asynchrously (#10080)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ce730293c0 [improvement] send merged runtime filter asynchrously (#10080)
ce730293c0 is described below

commit ce730293c07faa4763821abe6f9f073b85e4961f
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Tue Jun 14 08:16:25 2022 +0800

    [improvement] send merged runtime filter asynchrously (#10080)
---
 be/src/runtime/runtime_filter_mgr.cpp | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index e29609b9b3..654f865729 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -239,7 +239,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             has_attachment = true;
         }
 
-        // TODO: async send publish rpc
         std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
         for (size_t i = 0; i < targets.size(); i++) {
             rpc_contexts.emplace_back(new PPublishFilterRpcContext);
@@ -249,6 +248,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             if (has_attachment) {
                 rpc_contexts[i]->cntl.request_attachment().append(request_attachment);
             }
+            rpc_contexts[i]->cid = rpc_contexts[i]->cntl.call_id();
 
             // set fragment-id
             auto request_fragment_id = rpc_contexts[i]->request.mutable_fragment_id();
@@ -267,9 +267,16 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
                 continue;
             }
             stub->apply_filter(&rpc_contexts[i]->cntl, &rpc_contexts[i]->request,
-                               &rpc_contexts[i]->response, nullptr);
+                               &rpc_contexts[i]->response, brpc::DoNothing());
+        }
+        for (auto& rpc_context : rpc_contexts) {
+            brpc::Join(rpc_context->cid);
+            if (rpc_context->cntl.Failed()) {
+                LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText();
+                ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+                        rpc_context->cntl.remote_side());
+            }
         }
-        /// TODO: use async and join rpc
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org