You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/12/22 14:42:43 UTC
[doris] branch master updated: [pipeline](schedule) Add profile for except node and fix steal task problem (#15282)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 388df291af [pipeline](schedule) Add profile for except node and fix steal task problem (#15282)
388df291af is described below
commit 388df291afa49c63ce0ba704bb323146888f4152
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Dec 22 22:42:37 2022 +0800
[pipeline](schedule) Add profile for except node and fix steal task problem (#15282)
---
be/src/pipeline/pipeline_task.cpp | 2 ++
be/src/pipeline/pipeline_task.h | 3 +++
be/src/pipeline/task_queue.cpp | 2 +-
be/src/pipeline/task_queue.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 1 -
be/src/vec/exec/vset_operation_node.cpp | 6 ++++--
be/src/vec/exec/vset_operation_node.h | 1 +
7 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 393caea0d0..c55ef9e305 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -35,6 +35,7 @@ void PipelineTask::_init_profile() {
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
+ _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
}
@@ -178,6 +179,7 @@ Status PipelineTask::close() {
}
if (_opened) {
COUNTER_UPDATE(_wait_source_timer, _wait_source_watcher.elapsed_time());
+ COUNTER_UPDATE(_schedule_counts, _schedule_time);
COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index ecc7f884b6..f14669f41a 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -121,6 +121,8 @@ public:
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
+ int pipeline_id() const { return _pipeline->_pipeline_id; }
+
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);
@@ -191,6 +193,7 @@ private:
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _get_block_timer;
RuntimeProfile::Counter* _block_counts;
+ RuntimeProfile::Counter* _schedule_counts;
MonotonicStopWatch _wait_source_watcher;
RuntimeProfile::Counter* _wait_source_timer;
MonotonicStopWatch _wait_sink_watcher;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index b9ca3c9f2a..93f569ed31 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -166,7 +166,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) {
next_id = 0;
}
DCHECK(next_id < _core_size);
- auto task = _async_queue[core_id].try_take();
+ auto task = _async_queue[next_id].try_take();
if (task) {
return task;
}
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 0a6bf81554..6432001fbc 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -108,7 +108,7 @@ private:
size_t _core_size;
std::atomic<size_t> _next_core = 0;
std::atomic<bool> _closed;
- static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 200;
+ static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
} // namespace pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 915a2e8935..c5e5ee57c1 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -285,7 +285,6 @@ void TaskScheduler::_do_work(size_t index) {
break;
}
}
- LOG(INFO) << "Stop TaskScheduler worker " << index;
}
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index 9770c2800a..28497bfadb 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -228,7 +228,6 @@ template <bool is_intersect>
Status VSetOperationNode<is_intersect>::get_next(RuntimeState* state, Block* output_block,
bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExceptNode::get_next");
- SCOPED_TIMER(_probe_timer);
return pull(state, output_block, eos);
}
@@ -239,6 +238,7 @@ Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
+ _pull_timer = ADD_TIMER(runtime_profile(), "PullTime");
// Prepare result expr lists.
for (int i = 0; i < _child_expr_lists.size(); ++i) {
@@ -348,6 +348,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool eos) {
+ SCOPED_TIMER(_build_timer);
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
if (block->rows() != 0) {
@@ -382,6 +383,7 @@ Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool e
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block* output_block, bool* eos) {
+ SCOPED_TIMER(_pull_timer);
create_mutable_cols(output_block);
auto st = std::visit(
[&](auto&& arg) -> Status {
@@ -411,7 +413,6 @@ Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
bool eos = false;
while (!eos) {
block.clear_column_data();
- SCOPED_TIMER(_build_timer);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR_AND_CHECK_SPAN(
child(0)->get_next_after_projects(
@@ -471,6 +472,7 @@ void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* /*state*/, int child_id,
Block* block, bool eos) {
+ SCOPED_TIMER(_probe_timer);
CHECK(_build_finished) << "cannot sink probe data before build finished";
if (child_id > 1) {
CHECK(_probe_finished_children_index[child_id - 1])
diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h
index 1e8b5e1ebe..8b8d10e9e3 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -110,6 +110,7 @@ private:
MutableBlock _mutable_block;
RuntimeProfile::Counter* _build_timer; // time to build hash table
RuntimeProfile::Counter* _probe_timer; // time to probe
+ RuntimeProfile::Counter* _pull_timer; // time to pull data
template <class HashTableContext, bool is_intersected>
friend struct HashTableBuild;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org