You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/09 00:47:04 UTC

[doris] branch master updated: [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c57fa7c930 [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)
c57fa7c930 is described below

commit c57fa7c9306df4883727a93831007381e66ccb7e
Author: Lijia Liu <li...@yeah.net>
AuthorDate: Mon Jan 9 08:46:58 2023 +0800

    [Pipeline] Fix PipScannerContext::can_finish return wrong status (#15259)
    
    Now in ScannerContext::push_back_scanner_and_reschedule, _num_running_scanners-- is before _num_scheduling_ctx++.
    InPipScannerContext::can_finish, we check _num_running_scanners == 0 && _num_scheduling_ctx == 0 without obtaining _transfer_lock.
    In follow case, PipScannerContext::can_finish will return wrong result.
    
    _num_running_scanners--
    Check _num_running_scanners == 0 && _num_scheduling_ctx == 0` return true.
    _num_scheduling_ctx++
    So, we can set _num_running_scanners-- in the last of this func.
    
    Describe your changes.
    
    PipScannerContext::get_block_from_queue not block.
    Set _num_running_scanners-- in the last of ScannerContext::push_back_scanner_and_reschedule.
---
 be/src/pipeline/exec/scan_operator.cpp     |  6 ++--
 be/src/pipeline/pipeline_task.cpp          | 44 ++++++++++++++++++++++++------
 be/src/pipeline/pipeline_task.h            | 19 +++++++++++--
 be/src/vec/exec/scan/pip_scanner_context.h |  6 +++-
 be/src/vec/exec/scan/scanner_context.cpp   | 14 ++++++----
 be/src/vec/exec/scan/scanner_context.h     |  4 +--
 be/src/vec/exec/vbroker_scan_node.h        |  2 +-
 7 files changed, 73 insertions(+), 22 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index f9f2125878..16b210ddf7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -25,10 +25,10 @@ namespace doris::pipeline {
 OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
 
 bool ScanOperator::can_read() {
-    if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
+    if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) {
         // _eos: need eos
         // _scanner_ctx->done(): need finish
-        // _scanner_ctx->can_finish(): should be scheduled
+        // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
         return true;
     } else {
         return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
@@ -36,7 +36,7 @@ bool ScanOperator::can_read() {
 }
 
 bool ScanOperator::is_pending_finish() const {
-    return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
+    return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
 }
 
 bool ScanOperator::runtime_filters_are_ready_or_timeout() {
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 3cd5f7f1e3..8aa033a759 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -28,8 +28,17 @@ void PipelineTask::_init_profile() {
     auto* task_profile = new RuntimeProfile(ss.str());
     _parent_profile->add_child(task_profile, true, nullptr);
     _task_profile.reset(task_profile);
-    _sink_timer = ADD_TIMER(_task_profile, "SinkTime");
-    _get_block_timer = ADD_TIMER(_task_profile, "GetBlockTime");
+    _task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");
+
+    static const char* exec_time = "ExecuteTime";
+    _exec_timer = ADD_TIMER(_task_profile, exec_time);
+    _prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
+    _open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
+    _get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
+    _sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
+    _finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
+    _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
+
     _wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
     _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
     _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
@@ -39,12 +48,16 @@ void PipelineTask::_init_profile() {
     _block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
     _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
     _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
+    _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
 }
 
 Status PipelineTask::prepare(RuntimeState* state) {
     DCHECK(_sink);
     DCHECK(_cur_state == NOT_READY);
     _init_profile();
+    SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_CPU_TIMER(_task_cpu_timer);
+    SCOPED_TIMER(_prepare_timer);
     RETURN_IF_ERROR(_sink->prepare(state));
     for (auto& o : _operators) {
         RETURN_IF_ERROR(o->prepare(state));
@@ -94,6 +107,9 @@ bool PipelineTask::has_dependency() {
 }
 
 Status PipelineTask::open() {
+    SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_CPU_TIMER(_task_cpu_timer);
+    SCOPED_TIMER(_open_timer);
     for (auto& o : _operators) {
         RETURN_IF_ERROR(o->open(_state));
     }
@@ -105,8 +121,10 @@ Status PipelineTask::open() {
 }
 
 Status PipelineTask::execute(bool* eos) {
-    SCOPED_ATTACH_TASK(runtime_state());
     SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_CPU_TIMER(_task_cpu_timer);
+    SCOPED_TIMER(_exec_timer);
+    SCOPED_ATTACH_TASK(runtime_state());
     int64_t time_spent = 0;
     // The status must be runnable
     *eos = false;
@@ -170,15 +188,23 @@ Status PipelineTask::execute(bool* eos) {
 }
 
 Status PipelineTask::finalize() {
+    SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_CPU_TIMER(_task_cpu_timer);
+    SCOPED_TIMER(_finalize_timer);
     return _sink->finalize(_state);
 }
 
 Status PipelineTask::close() {
-    auto s = _sink->close(_state);
-    for (auto& op : _operators) {
-        auto tem = op->close(_state);
-        if (!tem.ok() && s.ok()) {
-            s = tem;
+    int64_t close_ns = 0;
+    Status s;
+    {
+        SCOPED_RAW_TIMER(&close_ns);
+        s = _sink->close(_state);
+        for (auto& op : _operators) {
+            auto tem = op->close(_state);
+            if (!tem.ok() && s.ok()) {
+                s = tem;
+            }
         }
     }
     if (_opened) {
@@ -187,6 +213,8 @@ Status PipelineTask::close() {
         COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
         COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
         COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
+        COUNTER_UPDATE(_close_timer, close_ns);
+        COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
     }
     return s;
 }
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index f6a6512201..2e57cf6453 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -156,7 +156,15 @@ public:
                                            : _pipeline->_previous_schedule_id;
     }
 
-    void set_previous_core_id(int id) { _previous_schedule_id = id; }
+    void set_previous_core_id(int id) {
+        if (id == _previous_schedule_id) {
+            return;
+        }
+        if (_previous_schedule_id != -1) {
+            COUNTER_UPDATE(_core_change_times, 1);
+        }
+        _previous_schedule_id = id;
+    }
 
     bool has_dependency();
 
@@ -197,8 +205,14 @@ private:
 
     RuntimeProfile* _parent_profile;
     std::unique_ptr<RuntimeProfile> _task_profile;
-    RuntimeProfile::Counter* _sink_timer;
+    RuntimeProfile::Counter* _task_cpu_timer;
+    RuntimeProfile::Counter* _prepare_timer;
+    RuntimeProfile::Counter* _open_timer;
+    RuntimeProfile::Counter* _exec_timer;
     RuntimeProfile::Counter* _get_block_timer;
+    RuntimeProfile::Counter* _sink_timer;
+    RuntimeProfile::Counter* _finalize_timer;
+    RuntimeProfile::Counter* _close_timer;
     RuntimeProfile::Counter* _block_counts;
     RuntimeProfile::Counter* _block_by_source_counts;
     RuntimeProfile::Counter* _block_by_sink_counts;
@@ -213,5 +227,6 @@ private:
     MonotonicStopWatch _wait_schedule_watcher;
     RuntimeProfile::Counter* _wait_schedule_timer;
     RuntimeProfile::Counter* _yield_counts;
+    RuntimeProfile::Counter* _core_change_times;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h
index d595396c72..20d45aca95 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -35,9 +35,13 @@ public:
 
     void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); }
 
+    Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = false) override {
+        return vectorized::ScannerContext::get_block_from_queue(block, eos, false);
+    }
+
     // We should make those method lock free.
     bool done() override { return _is_finished || _should_stop || _status_error; }
-    bool can_finish() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
+    bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
     bool empty_in_queue() override { return _blocks_queue_empty; }
 
 private:
diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp
index 1940758907..29b379ecfe 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -133,7 +133,7 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
         _state->exec_env()->scanner_scheduler()->submit(this);
     }
     // Wait for block from queue
-    {
+    if (wait) {
         SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
         _blocks_queue_added_cv.wait(l, [this]() {
             return !_blocks_queue.empty() || _is_finished || !_process_status.ok() ||
@@ -240,7 +240,7 @@ void ScannerContext::clear_and_join() {
     return;
 }
 
-bool ScannerContext::can_finish() {
+bool ScannerContext::no_schedule() {
     std::unique_lock<std::mutex> l(_transfer_lock);
     return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
 }
@@ -263,9 +263,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
     }
 
     std::lock_guard<std::mutex> l(_transfer_lock);
-    _num_running_scanners--;
     _num_scheduling_ctx++;
-    _state->exec_env()->scanner_scheduler()->submit(this);
+    auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
+    if (!submit_st.ok()) {
+        _num_scheduling_ctx--;
+    }
 
     // Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
     // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
@@ -274,14 +276,16 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
     // same scanner.
     if (scanner->need_to_close() && scanner->set_counted_down() &&
         (--_num_unfinished_scanners) == 0) {
-        _is_finished = true;
         // ATTN: this 2 counters will be set at close() again, which is the final values.
         // But we set them here because the counter set at close() can not send to FE's profile.
         // So we set them here, and the counter value may be little less than final values.
         COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
         COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
+        _is_finished = true;
         _blocks_queue_added_cv.notify_one();
     }
+    // In pipeline engine, doris will close scanners when `no_schedule`.
+    _num_running_scanners--;
     _ctx_finish_cv.notify_one();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index 2a2d0ffcf1..c6c6afe1e3 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -78,7 +78,7 @@ public:
     // Get next block from blocks queue. Called by ScanNode
     // Set eos to true if there is no more data to read.
     // And if eos is true, the block returned must be nullptr.
-    Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
+    virtual Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
 
     // When a scanner complete a scan, this method will be called
     // to return the scanner to the list for next scheduling.
@@ -118,7 +118,7 @@ public:
 
     void clear_and_join();
 
-    virtual bool can_finish();
+    virtual bool no_schedule();
 
     std::string debug_string();
 
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index de46104088..bad1bd93c7 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -89,7 +89,7 @@ private:
     std::condition_variable _queue_reader_cond;
     std::condition_variable _queue_writer_cond;
 
-    int _num_running_scanners;
+    std::atomic<int> _num_running_scanners;
 
     std::atomic<bool> _scan_finished;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org