You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/22 14:42:43 UTC

[doris] branch master updated: [pipeline](schedule) Add profile for except node and fix steal task problem (#15282)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 388df291af [pipeline](schedule) Add profile for except node and fix steal task problem (#15282)
388df291af is described below

commit 388df291afa49c63ce0ba704bb323146888f4152
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Dec 22 22:42:37 2022 +0800

    [pipeline](schedule) Add profile for except node and fix steal task problem (#15282)
---
 be/src/pipeline/pipeline_task.cpp       | 2 ++
 be/src/pipeline/pipeline_task.h         | 3 +++
 be/src/pipeline/task_queue.cpp          | 2 +-
 be/src/pipeline/task_queue.h            | 2 +-
 be/src/pipeline/task_scheduler.cpp      | 1 -
 be/src/vec/exec/vset_operation_node.cpp | 6 ++++--
 be/src/vec/exec/vset_operation_node.h   | 1 +
 7 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 393caea0d0..c55ef9e305 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -35,6 +35,7 @@ void PipelineTask::_init_profile() {
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
     _wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
     _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
+    _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
     _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
 }
 
@@ -178,6 +179,7 @@ Status PipelineTask::close() {
     }
     if (_opened) {
         COUNTER_UPDATE(_wait_source_timer, _wait_source_watcher.elapsed_time());
+        COUNTER_UPDATE(_schedule_counts, _schedule_time);
         COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
         COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
         COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ecc7f884b6..f14669f41a 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -121,6 +121,8 @@ public:
     void start_schedule_watcher() { _wait_schedule_watcher.start(); }
     void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
 
+    int pipeline_id() const { return _pipeline->_pipeline_id; }
+
     PipelineTaskState get_state() { return _cur_state; }
     void set_state(PipelineTaskState state);
 
@@ -191,6 +193,7 @@ private:
     RuntimeProfile::Counter* _sink_timer;
     RuntimeProfile::Counter* _get_block_timer;
     RuntimeProfile::Counter* _block_counts;
+    RuntimeProfile::Counter* _schedule_counts;
     MonotonicStopWatch _wait_source_watcher;
     RuntimeProfile::Counter* _wait_source_timer;
     MonotonicStopWatch _wait_sink_watcher;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index b9ca3c9f2a..93f569ed31 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -166,7 +166,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) {
             next_id = 0;
         }
         DCHECK(next_id < _core_size);
-        auto task = _async_queue[core_id].try_take();
+        auto task = _async_queue[next_id].try_take();
         if (task) {
             return task;
         }
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 0a6bf81554..6432001fbc 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -108,7 +108,7 @@ private:
     size_t _core_size;
     std::atomic<size_t> _next_core = 0;
     std::atomic<bool> _closed;
-    static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 200;
+    static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 915a2e8935..c5e5ee57c1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -285,7 +285,6 @@ void TaskScheduler::_do_work(size_t index) {
             break;
         }
     }
-    LOG(INFO) << "Stop TaskScheduler worker " << index;
 }
 
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index 9770c2800a..28497bfadb 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -228,7 +228,6 @@ template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::get_next(RuntimeState* state, Block* output_block,
                                                  bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next");
-    SCOPED_TIMER(_probe_timer);
     return pull(state, output_block, eos);
 }
 
@@ -239,6 +238,7 @@ Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+    _pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
 
     // Prepare result expr lists.
     for (int i = 0; i < _child_expr_lists.size(); ++i) {
@@ -348,6 +348,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
 
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool eos) {
+    SCOPED_TIMER(_build_timer);
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
     if (block->rows() != 0) {
@@ -382,6 +383,7 @@ Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool e
 
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block* output_block, bool* eos) {
+    SCOPED_TIMER(_pull_timer);
     create_mutable_cols(output_block);
     auto st = std::visit(
             [&](auto&& arg) -> Status {
@@ -411,7 +413,6 @@ Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
     bool eos = false;
     while (!eos) {
         block.clear_column_data();
-        SCOPED_TIMER(_build_timer);
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR_AND_CHECK_SPAN(
                 child(0)->get_next_after_projects(
@@ -471,6 +472,7 @@ void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* /*state*/, int child_id,
                                                    Block* block, bool eos) {
+    SCOPED_TIMER(_probe_timer);
     CHECK(_build_finished) << "cannot sink probe data before build finished";
     if (child_id > 1) {
         CHECK(_probe_finished_children_index[child_id - 1])
diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h
index 1e8b5e1ebe..8b8d10e9e3 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -110,6 +110,7 @@ private:
     MutableBlock _mutable_block;
     RuntimeProfile::Counter* _build_timer; // time to build hash table
     RuntimeProfile::Counter* _probe_timer; // time to probe
+    RuntimeProfile::Counter* _pull_timer;  // time to pull data
 
     template <class HashTableContext, bool is_intersected>
     friend struct HashTableBuild;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org