You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/01/06 07:08:19 UTC

[doris] branch master updated: [Pipeline](Exec) disable work steal of hash join build (#15652)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1038093c29 [Pipeline](Exec) disable work steal of hash join build (#15652)
1038093c29 is described below

commit 1038093c29e94251546567aeebf6ccad430bccf3
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Fri Jan 6 15:08:10 2023 +0800

    [Pipeline](Exec) disable work steal of hash join build (#15652)
---
 be/src/pipeline/pipeline.h                    | 19 ++++++++++++++++---
 be/src/pipeline/pipeline_fragment_context.cpp |  1 +
 be/src/pipeline/pipeline_task.h               | 11 +++++++++--
 be/src/pipeline/task_queue.cpp                | 23 +++++++++++++----------
 be/src/pipeline/task_queue.h                  |  6 +++---
 5 files changed, 42 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 054145c3d2..0b7b196d6a 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -37,7 +37,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
+            : _complete_dependency(0),
+              _pipeline_id(pipeline_id),
+              _context(context),
+              _can_steal(true) {
         _init_profile();
     }
 
@@ -48,9 +51,13 @@ public:
 
     // If all dependencies are finished, this pipeline task should be scheduled.
     // e.g. Hash join probe task will be scheduled once Hash join build task is finished.
-    bool finish_one_dependency() {
+    bool finish_one_dependency(int dependency_core_id) {
         DCHECK(_complete_dependency < _dependencies.size());
-        return _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
+        bool finish = _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
+        if (finish) {
+            _previous_schedule_id = dependency_core_id;
+        }
+        return finish;
     }
 
     bool has_dependency() { return _complete_dependency.load() < _dependencies.size(); }
@@ -65,6 +72,10 @@ public:
 
     RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
 
+    bool can_steal() const { return _can_steal; }
+
+    void disable_task_steal() { _can_steal = false; }
+
 private:
     void _init_profile();
     std::atomic<uint32_t> _complete_dependency;
@@ -77,6 +88,8 @@ private:
 
     PipelineId _pipeline_id;
     std::weak_ptr<PipelineFragmentContext> _context;
+    bool _can_steal;
+    int _previous_schedule_id = -1;
 
     std::unique_ptr<RuntimeProfile> _pipeline_profile;
 };
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index feff7b304c..bd57abd240 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -459,6 +459,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         OperatorBuilderPtr join_sink =
                 std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node);
         RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
+        new_pipe->disable_task_steal();
 
         RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
         OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>(
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index be267b8efa..f6a6512201 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -99,6 +99,7 @@ public:
               _sink(sink),
               _prepared(false),
               _opened(false),
+              _can_steal(pipeline->_can_steal),
               _state(state),
               _cur_state(NOT_READY),
               _data_state(SourceState::DEPEND_ON_SOURCE),
@@ -136,11 +137,13 @@ public:
 
     bool sink_can_write() { return _sink->can_write(); }
 
+    bool can_steal() const { return _can_steal; }
+
     Status finalize();
 
     void finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
-            p->finish_one_dependency();
+            p->finish_one_dependency(_previous_schedule_id);
         }
     }
 
@@ -148,7 +151,10 @@ public:
 
     QueryFragmentsCtx* query_fragments_context();
 
-    int get_previous_core_id() const { return _previous_schedule_id; }
+    int get_previous_core_id() const {
+        return _previous_schedule_id != -1 ? _previous_schedule_id
+                                           : _pipeline->_previous_schedule_id;
+    }
 
     void set_previous_core_id(int id) { _previous_schedule_id = id; }
 
@@ -180,6 +186,7 @@ private:
 
     bool _prepared;
     bool _opened;
+    bool _can_steal;
     RuntimeState* _state;
     int _previous_schedule_id = -1;
     uint32_t _schedule_time = 0;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 93f569ed31..ff0b3c59ed 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -20,12 +20,15 @@
 namespace doris {
 namespace pipeline {
 
-PipelineTask* SubWorkTaskQueue::try_take() {
+PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) {
     if (_queue.empty()) {
         return nullptr;
     }
-    ++_schedule_time;
     auto task = _queue.front();
+    if (!task->can_steal() && is_steal) {
+        return nullptr;
+    }
+    ++_schedule_time;
     _queue.pop();
     return task;
 }
@@ -52,7 +55,7 @@ void WorkTaskQueue::close() {
     _wait_task.notify_all();
 }
 
-PipelineTask* WorkTaskQueue::try_take_unprotected() {
+PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) {
     if (_total_task_size == 0 || _closed) {
         return nullptr;
     }
@@ -76,7 +79,7 @@ PipelineTask* WorkTaskQueue::try_take_unprotected() {
         }
     }
 
-    auto task = _sub_queues[idx].try_take();
+    auto task = _sub_queues[idx].try_take(is_steal);
     if (task) {
         _total_task_size--;
     }
@@ -93,15 +96,15 @@ int WorkTaskQueue::_compute_level(PipelineTask* task) {
     return SUB_QUEUE_LEVEL - 1;
 }
 
-PipelineTask* WorkTaskQueue::try_take() {
+PipelineTask* WorkTaskQueue::try_take(bool is_steal) {
     // TODO other efficient lock? e.g. if get lock fail, return null_ptr
     std::unique_lock<std::mutex> lock(_work_size_mutex);
-    return try_take_unprotected();
+    return try_take_unprotected(is_steal);
 }
 
 PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
     std::unique_lock<std::mutex> lock(_work_size_mutex);
-    auto task = try_take_unprotected();
+    auto task = try_take_unprotected(false);
     if (task) {
         return task;
     } else {
@@ -110,7 +113,7 @@ PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
         } else {
             _wait_task.wait(lock);
         }
-        return try_take_unprotected();
+        return try_take_unprotected(false);
     }
 }
 
@@ -138,7 +141,7 @@ void TaskQueue::close() {
 PipelineTask* TaskQueue::try_take(size_t core_id) {
     PipelineTask* task;
     while (!_closed) {
-        task = _async_queue[core_id].try_take();
+        task = _async_queue[core_id].try_take(false);
         if (task) {
             break;
         }
@@ -166,7 +169,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) {
             next_id = 0;
         }
         DCHECK(next_id < _core_size);
-        auto task = _async_queue[next_id].try_take();
+        auto task = _async_queue[next_id].try_take(true);
         if (task) {
             return task;
         }
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 6432001fbc..54e48efe42 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -29,7 +29,7 @@ class SubWorkTaskQueue {
 public:
     void push_back(PipelineTask* task) { _queue.emplace(task); }
 
-    PipelineTask* try_take();
+    PipelineTask* try_take(bool is_steal);
 
     void set_factor_for_normal(double factor_for_normal) { _factor_for_normal = factor_for_normal; }
 
@@ -53,9 +53,9 @@ public:
 
     void close();
 
-    PipelineTask* try_take_unprotected();
+    PipelineTask* try_take_unprotected(bool is_steal);
 
-    PipelineTask* try_take();
+    PipelineTask* try_take(bool is_steal);
 
     PipelineTask* take(uint32_t timeout_ms = 0);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org