You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/06/04 04:10:49 UTC

[doris] branch master updated: [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e1dbee5e90 [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)
e1dbee5e90 is described below

commit e1dbee5e908d0c662cf00b55d31f9f6ba17d569f
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sun Jun 4 12:10:35 2023 +0800

    [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)
---
 be/src/runtime/fragment_mgr.cpp | 49 ++++++++++++++++++++++++++++++++---------
 1 file changed, 39 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 26646bbd0e..543b10f32a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -848,11 +848,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
     VLOG_ROW << "query options is "
              << apache::thrift::ThriftDebugString(params.query_options).c_str();
 
-    std::shared_ptr<FragmentExecState> exec_state;
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
-
-    for (size_t i = 0; i < params.local_params.size(); i++) {
+    auto pre_and_submit = [&](int i) {
         const auto& local_params = params.local_params[i];
 
         const TUniqueId& fragment_instance_id = local_params.fragment_instance_id;
@@ -861,15 +859,14 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             auto iter = _pipeline_map.find(fragment_instance_id);
             if (iter != _pipeline_map.end()) {
                 // Duplicated
-                continue;
+                return Status::OK();
             }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
         }
         START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
         span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-        query_ctx->fragment_ids.push_back(fragment_instance_id);
-
-        exec_state.reset(new FragmentExecState(
+        std::shared_ptr<FragmentExecState> exec_state(new FragmentExecState(
                 query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
                 query_ctx,
                 std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
@@ -912,10 +909,42 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             _pipeline_map.insert(std::make_pair(fragment_instance_id, context));
             _cv.notify_all();
         }
-        RETURN_IF_ERROR(context->submit());
-    }
 
-    return Status::OK();
+        return context->submit();
+    };
+
+    int target_size = params.local_params.size();
+    if (target_size > 1) {
+        int prepare_done = {0};
+        Status prepare_status[target_size];
+        std::mutex m;
+        std::condition_variable cv;
+
+        for (size_t i = 0; i < target_size; i++) {
+            _thread_pool->submit_func([&, i]() {
+                prepare_status[i] = pre_and_submit(i);
+                std::unique_lock<std::mutex> lock(m);
+                prepare_done++;
+                if (prepare_done == target_size) {
+                    cv.notify_one();
+                }
+            });
+        }
+
+        std::unique_lock<std::mutex> lock(m);
+        if (prepare_done != target_size) {
+            cv.wait(lock);
+
+            for (size_t i = 0; i < target_size; i++) {
+                if (!prepare_status[i].ok()) {
+                    return prepare_status[i];
+                }
+            }
+        }
+        return Status::OK();
+    } else {
+        return pre_and_submit(0);
+    }
 }
 
 template <typename Param>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org