You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/09 00:47:04 UTC
[doris] branch master updated: [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c57fa7c930 [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)
c57fa7c930 is described below
commit c57fa7c9306df4883727a93831007381e66ccb7e
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Mon Jan 9 08:46:58 2023 +0800
[Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)
Now in ScannerContext::push_back_scanner_and_reschedule, _num_running_scanners-- is before _num_scheduling_ctx++.
InPipScannerContext::can_finish, we check _num_running_scanners == 0 && _num_scheduling_ctx == 0 without obtaining _transfer_lock.
In follow case, PipScannerContext::can_finish will return wrong result.
_num_running_scanners--
Check _num_running_scanners == 0 && _num_scheduling_ctx == 0` return true.
_num_scheduling_ctx++
So, we can set _num_running_scanners-- in the last of this func.
Describe your changes.
PipScannerContext::get_block_from_queue not block.
Set _num_running_scanners-- in the last of ScannerContext::push_back_scanner_and_reschedule.
---
be/src/pipeline/exec/scan_operator.cpp | 6 ++--
be/src/pipeline/pipeline_task.cpp | 44 ++++++++++++++++++++++++------
be/src/pipeline/pipeline_task.h | 19 +++++++++++--
be/src/vec/exec/scan/pip_scanner_context.h | 6 +++-
be/src/vec/exec/scan/scanner_context.cpp | 14 ++++++----
be/src/vec/exec/scan/scanner_context.h | 4 +--
be/src/vec/exec/vbroker_scan_node.h | 2 +-
7 files changed, 73 insertions(+), 22 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index f9f2125878..16b210ddf7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -25,10 +25,10 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
bool ScanOperator::can_read() {
- if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
+ if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
- // _scanner_ctx->can_finish(): should be scheduled
+ // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
@@ -36,7 +36,7 @@ bool ScanOperator::can_read() {
}
bool ScanOperator::is_pending_finish() const {
- return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
+ return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}
bool ScanOperator::runtime_filters_are_ready_or_timeout() {
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 3cd5f7f1e3..8aa033a759 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -28,8 +28,17 @@ void PipelineTask::_init_profile() {
auto* task_profile = new RuntimeProfile(ss.str());
_parent_profile->add_child(task_profile, true, nullptr);
_task_profile.reset(task_profile);
- _sink_timer = ADD_TIMER(_task_profile, "SinkTime");
- _get_block_timer = ADD_TIMER(_task_profile, "GetBlockTime");
+ _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
+
+ static const char* exec_time = "ExecuteTime";
+ _exec_timer = ADD_TIMER(_task_profile, exec_time);
+ _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
+ _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
+ _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
+ _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
+ _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
+ _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
+
_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
@@ -39,12 +48,16 @@ void PipelineTask::_init_profile() {
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
+ _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
}
Status PipelineTask::prepare(RuntimeState* state) {
DCHECK(_sink);
DCHECK(_cur_state == NOT_READY);
_init_profile();
+ SCOPED_TIMER(_task_profile->total_time_counter());
+ SCOPED_CPU_TIMER(_task_cpu_timer);
+ SCOPED_TIMER(_prepare_timer);
RETURN_IF_ERROR(_sink->prepare(state));
for (auto& o : _operators) {
RETURN_IF_ERROR(o->prepare(state));
@@ -94,6 +107,9 @@ bool PipelineTask::has_dependency() {
}
Status PipelineTask::open() {
+ SCOPED_TIMER(_task_profile->total_time_counter());
+ SCOPED_CPU_TIMER(_task_cpu_timer);
+ SCOPED_TIMER(_open_timer);
for (auto& o : _operators) {
RETURN_IF_ERROR(o->open(_state));
}
@@ -105,8 +121,10 @@ Status PipelineTask::open() {
}
Status PipelineTask::execute(bool* eos) {
- SCOPED_ATTACH_TASK(runtime_state());
SCOPED_TIMER(_task_profile->total_time_counter());
+ SCOPED_CPU_TIMER(_task_cpu_timer);
+ SCOPED_TIMER(_exec_timer);
+ SCOPED_ATTACH_TASK(runtime_state());
int64_t time_spent = 0;
// The status must be runnable
*eos = false;
@@ -170,15 +188,23 @@ Status PipelineTask::execute(bool* eos) {
}
Status PipelineTask::finalize() {
+ SCOPED_TIMER(_task_profile->total_time_counter());
+ SCOPED_CPU_TIMER(_task_cpu_timer);
+ SCOPED_TIMER(_finalize_timer);
return _sink->finalize(_state);
}
Status PipelineTask::close() {
- auto s = _sink->close(_state);
- for (auto& op : _operators) {
- auto tem = op->close(_state);
- if (!tem.ok() && s.ok()) {
- s = tem;
+ int64_t close_ns = 0;
+ Status s;
+ {
+ SCOPED_RAW_TIMER(&close_ns);
+ s = _sink->close(_state);
+ for (auto& op : _operators) {
+ auto tem = op->close(_state);
+ if (!tem.ok() && s.ok()) {
+ s = tem;
+ }
}
}
if (_opened) {
@@ -187,6 +213,8 @@ Status PipelineTask::close() {
COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
+ COUNTER_UPDATE(_close_timer, close_ns);
+ COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
return s;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index f6a6512201..2e57cf6453 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -156,7 +156,15 @@ public:
: _pipeline->_previous_schedule_id;
}
- void set_previous_core_id(int id) { _previous_schedule_id = id; }
+ void set_previous_core_id(int id) {
+ if (id == _previous_schedule_id) {
+ return;
+ }
+ if (_previous_schedule_id != -1) {
+ COUNTER_UPDATE(_core_change_times, 1);
+ }
+ _previous_schedule_id = id;
+ }
bool has_dependency();
@@ -197,8 +205,14 @@ private:
RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
- RuntimeProfile::Counter* _sink_timer;
+ RuntimeProfile::Counter* _task_cpu_timer;
+ RuntimeProfile::Counter* _prepare_timer;
+ RuntimeProfile::Counter* _open_timer;
+ RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _get_block_timer;
+ RuntimeProfile::Counter* _sink_timer;
+ RuntimeProfile::Counter* _finalize_timer;
+ RuntimeProfile::Counter* _close_timer;
RuntimeProfile::Counter* _block_counts;
RuntimeProfile::Counter* _block_by_source_counts;
RuntimeProfile::Counter* _block_by_sink_counts;
@@ -213,5 +227,6 @@ private:
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
+ RuntimeProfile::Counter* _core_change_times;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h
index d595396c72..20d45aca95 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -35,9 +35,13 @@ public:
void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); }
+ Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = false) override {
+ return vectorized::ScannerContext::get_block_from_queue(block, eos, false);
+ }
+
// We should make those method lock free.
bool done() override { return _is_finished || _should_stop || _status_error; }
- bool can_finish() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
+ bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
bool empty_in_queue() override { return _blocks_queue_empty; }
private:
diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index 1940758907..29b379ecfe 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -133,7 +133,7 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
_state->exec_env()->scanner_scheduler()->submit(this);
}
// Wait for block from queue
- {
+ if (wait) {
SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
_blocks_queue_added_cv.wait(l, [this]() {
return !_blocks_queue.empty() || _is_finished || !_process_status.ok() ||
@@ -240,7 +240,7 @@ void ScannerContext::clear_and_join() {
return;
}
-bool ScannerContext::can_finish() {
+bool ScannerContext::no_schedule() {
std::unique_lock<std::mutex> l(_transfer_lock);
return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
}
@@ -263,9 +263,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
}
std::lock_guard<std::mutex> l(_transfer_lock);
- _num_running_scanners--;
_num_scheduling_ctx++;
- _state->exec_env()->scanner_scheduler()->submit(this);
+ auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
+ if (!submit_st.ok()) {
+ _num_scheduling_ctx--;
+ }
// Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
// to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
@@ -274,14 +276,16 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
- _is_finished = true;
// ATTN: this 2 counters will be set at close() again, which is the final values.
// But we set them here because the counter set at close() can not send to FE's profile.
// So we set them here, and the counter value may be little less than final values.
COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
+ _is_finished = true;
_blocks_queue_added_cv.notify_one();
}
+ // In pipeline engine, doris will close scanners when `no_schedule`.
+ _num_running_scanners--;
_ctx_finish_cv.notify_one();
}
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index 2a2d0ffcf1..c6c6afe1e3 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -78,7 +78,7 @@ public:
// Get next block from blocks queue. Called by ScanNode
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
- Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
+ virtual Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
@@ -118,7 +118,7 @@ public:
void clear_and_join();
- virtual bool can_finish();
+ virtual bool no_schedule();
std::string debug_string();
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index de46104088..bad1bd93c7 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -89,7 +89,7 @@ private:
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
- int _num_running_scanners;
+ std::atomic<int> _num_running_scanners;
std::atomic<bool> _scan_finished;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org