You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/11/13 15:03:56 UTC

(doris) branch master updated: [fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad49dceaa0 [fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)
5ad49dceaa0 is described below

commit 5ad49dceaa0c9a0462f9f0f8700548e5294f5de0
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Mon Nov 13 23:03:49 2023 +0800

    [fix](scanner_schedule) scanner hangs due to negative num_running_scanners (#26816)
    
    * [fix] scanner hangs due to negative num_running_scanners
    
    Before the patch, num_running_scanners is increased after submitting,
    then it may be decreased before increasing then negative values can
    be seen by get_block_from_queue and a expected submit does not happend.
    
    Co-authored-by: Mingyu Chen <mo...@gmail.com>
---
 be/src/vec/exec/scan/scanner_context.cpp   | 35 ++++++++++++++++++++++++------
 be/src/vec/exec/scan/scanner_context.h     |  5 +++--
 be/src/vec/exec/scan/scanner_scheduler.cpp |  8 +++++--
 3 files changed, 37 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index 5723a58bea0..d3688507fcd 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -43,6 +43,8 @@
 
 namespace doris::vectorized {
 
+using namespace std::chrono_literals;
+
 ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent,
                                const doris::TupleDescriptor* output_tuple_desc,
                                const std::list<VScannerSPtr>& scanners_, int64_t limit_,
@@ -217,7 +219,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
     // (if the scheduler continues to schedule, it will cause a lot of busy running).
     // At this point, consumers are required to trigger new scheduling to ensure that
     // data can be continuously fetched.
-    if (should_be_scheduled() && _num_running_scanners == 0) {
+    int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
+    int32_t serving_blocks_num = _serving_blocks_num;
+    bool to_be_schedule = should_be_scheduled();
+    int num_running_scanners = _num_running_scanners;
+
+    bool is_scheduled = false;
+    if (to_be_schedule && _num_running_scanners == 0) {
+        is_scheduled = true;
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
@@ -235,7 +244,13 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
         SCOPED_TIMER(_scanner_wait_batch_timer);
         while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
                  state->is_cancelled())) {
-            _blocks_queue_added_cv.wait(l);
+            if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
+                LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
+                          << ", serving_blocks_num " << serving_blocks_num
+                          << ", num_running_scanners " << num_running_scanners
+                          << ", to_be_scheudle " << to_be_schedule << (void*)this;
+            }
+            _blocks_queue_added_cv.wait_for(l, 1s);
         }
     }
 
@@ -297,10 +312,14 @@ void ScannerContext::set_should_stop() {
     _blocks_queue_added_cv.notify_one();
 }
 
-void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc) {
+void ScannerContext::inc_num_running_scanners(int32_t inc) {
+    std::lock_guard l(_transfer_lock);
+    _num_running_scanners += inc;
+}
+
+void ScannerContext::dec_num_scheduling_ctx() {
     std::lock_guard l(_transfer_lock);
-    _num_running_scanners += scanner_inc;
-    _num_scheduling_ctx += sched_inc;
+    _num_scheduling_ctx--;
     if (_finish_dependency) {
         if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
             _finish_dependency->set_ready_to_finish();
@@ -308,8 +327,10 @@ void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc)
             _finish_dependency->block_finishing();
         }
     }
-    _blocks_queue_added_cv.notify_one();
-    _ctx_finish_cv.notify_one();
+
+    if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+        _ctx_finish_cv.notify_one();
+    }
 }
 
 bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index 932fd294ff9..244aedf87a3 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -118,11 +118,12 @@ public:
     // Return true if this ScannerContext need no more process
     virtual bool done() { return _is_finished || _should_stop; }
 
-    // Update the running num of scanners and contexts
-    void update_num_running(int32_t scanner_inc, int32_t sched_inc);
+    void inc_num_running_scanners(int32_t scanner_inc);
 
     int get_num_running_scanners() const { return _num_running_scanners; }
 
+    void dec_num_scheduling_ctx();
+
     int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
 
     void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 6c40ccc242e..2942114d432 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -202,7 +202,10 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
     watch.start();
     ctx->incr_num_ctx_scheduling(1);
     size_t size = 0;
-    Defer defer {[&]() { ctx->update_num_running(size, -1); }};
+    Defer defer {[&]() {
+        ctx->incr_num_scanner_scheduling(size);
+        ctx->dec_num_scheduling_ctx();
+    }};
 
     if (ctx->done()) {
         return;
@@ -221,12 +224,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
         return;
     }
 
+    ctx->inc_num_running_scanners(this_run.size());
+
     // Submit scanners to thread pool
     // TODO(cmy): How to handle this "nice"?
     int nice = 1;
     auto iter = this_run.begin();
     auto submit_to_thread_pool = [&] {
-        ctx->incr_num_scanner_scheduling(this_run.size());
         if (ctx->thread_token != nullptr) {
             // TODO llj tg how to treat this?
             while (iter != this_run.end()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org