You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/11/13 15:03:56 UTC
(doris) branch master updated: [fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5ad49dceaa0 [fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)
5ad49dceaa0 is described below
commit 5ad49dceaa0c9a0462f9f0f8700548e5294f5de0
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Mon Nov 13 23:03:49 2023 +0800
[fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)
* [fix] scanner hangs due to negative num_running_scanners
Before the patch, num_running_scanners is increased after submitting,
then it may be decreased before increasing then negative values can
be seen by get_block_from_queue and a expected submit does not happend.
Co-authored-by: Mingyu Chen <mo...@gmail.com>
---
be/src/vec/exec/scan/scanner_context.cpp | 35 ++++++++++++++++++++++++------
be/src/vec/exec/scan/scanner_context.h | 5 +++--
be/src/vec/exec/scan/scanner_scheduler.cpp | 8 +++++--
3 files changed, 37 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index 5723a58bea0..d3688507fcd 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -43,6 +43,8 @@
namespace doris::vectorized {
+using namespace std::chrono_literals;
+
ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
@@ -217,7 +219,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
- if (should_be_scheduled() && _num_running_scanners == 0) {
+ int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
+ int32_t serving_blocks_num = _serving_blocks_num;
+ bool to_be_schedule = should_be_scheduled();
+ int num_running_scanners = _num_running_scanners;
+
+ bool is_scheduled = false;
+ if (to_be_schedule && _num_running_scanners == 0) {
+ is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
@@ -235,7 +244,13 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
- _blocks_queue_added_cv.wait(l);
+ if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
+ LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
+ << ", serving_blocks_num " << serving_blocks_num
+ << ", num_running_scanners " << num_running_scanners
+ << ", to_be_scheudle " << to_be_schedule << (void*)this;
+ }
+ _blocks_queue_added_cv.wait_for(l, 1s);
}
}
@@ -297,10 +312,14 @@ void ScannerContext::set_should_stop() {
_blocks_queue_added_cv.notify_one();
}
-void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc) {
+void ScannerContext::inc_num_running_scanners(int32_t inc) {
+ std::lock_guard l(_transfer_lock);
+ _num_running_scanners += inc;
+}
+
+void ScannerContext::dec_num_scheduling_ctx() {
std::lock_guard l(_transfer_lock);
- _num_running_scanners += scanner_inc;
- _num_scheduling_ctx += sched_inc;
+ _num_scheduling_ctx--;
if (_finish_dependency) {
if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
_finish_dependency->set_ready_to_finish();
@@ -308,8 +327,10 @@ void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc)
_finish_dependency->block_finishing();
}
}
- _blocks_queue_added_cv.notify_one();
- _ctx_finish_cv.notify_one();
+
+ if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+ _ctx_finish_cv.notify_one();
+ }
}
bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index 932fd294ff9..244aedf87a3 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -118,11 +118,12 @@ public:
// Return true if this ScannerContext need no more process
virtual bool done() { return _is_finished || _should_stop; }
- // Update the running num of scanners and contexts
- void update_num_running(int32_t scanner_inc, int32_t sched_inc);
+ void inc_num_running_scanners(int32_t scanner_inc);
int get_num_running_scanners() const { return _num_running_scanners; }
+ void dec_num_scheduling_ctx();
+
int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 6c40ccc242e..2942114d432 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -202,7 +202,10 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
watch.start();
ctx->incr_num_ctx_scheduling(1);
size_t size = 0;
- Defer defer {[&]() { ctx->update_num_running(size, -1); }};
+ Defer defer {[&]() {
+ ctx->incr_num_scanner_scheduling(size);
+ ctx->dec_num_scheduling_ctx();
+ }};
if (ctx->done()) {
return;
@@ -221,12 +224,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
return;
}
+ ctx->inc_num_running_scanners(this_run.size());
+
// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
auto iter = this_run.begin();
auto submit_to_thread_pool = [&] {
- ctx->incr_num_scanner_scheduling(this_run.size());
if (ctx->thread_token != nullptr) {
// TODO llj tg how to treat this?
while (iter != this_run.end()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org