You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/01/17 04:01:26 UTC

[doris] branch master updated: [Bug](pipeline) Fix DCHECK failure (#15928)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d34512501 [Bug](pipeline) Fix DCHECK failure (#15928)
7d34512501 is described below

commit 7d34512501a2867cfcc012a6054e3422d88ffe92
Author: Gabriel <ga...@gmail.com>
AuthorDate: Tue Jan 17 12:01:20 2023 +0800

    [Bug](pipeline) Fix DCHECK failure (#15928)
---
 be/src/pipeline/exec/operator.cpp      |  4 ++--
 be/src/pipeline/exec/operator.h        |  4 +++-
 be/src/pipeline/exec/scan_operator.cpp | 16 ++++++++++++++++
 be/src/pipeline/exec/scan_operator.h   |  4 ++++
 be/src/pipeline/pipeline_task.cpp      |  4 ++++
 be/src/pipeline/pipeline_task.h        |  3 +++
 be/src/pipeline/task_scheduler.cpp     |  1 +
 be/src/vec/exec/scan/scanner_context.h |  4 ++++
 be/src/vec/exec/scan/vscan_node.cpp    |  9 +++++++++
 be/src/vec/exec/scan/vscan_node.h      |  2 ++
 10 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 3b78b59e7c..39cb8b8814 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -44,8 +44,8 @@ const RowDescriptor& OperatorBase::row_desc() {
 
 std::string OperatorBase::debug_string() const {
     std::stringstream ss;
-    ss << _operator_builder->get_name() << ", source: " << is_source();
-    ss << ", sink: " << is_sink() << ", is closed: " << _is_closed;
+    ss << _operator_builder->get_name() << ", is source: " << is_source();
+    ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed;
     ss << ", is pending finish: " << is_pending_finish();
     return ss.str();
 }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e2fde2d9e5..32ede3618a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -216,6 +216,8 @@ public:
      */
     virtual bool is_pending_finish() const { return false; }
 
+    virtual Status try_close() { return Status::OK(); }
+
     bool is_closed() const { return _is_closed; }
 
     MemTracker* mem_tracker() const { return _mem_tracker.get(); }
@@ -225,7 +227,7 @@ public:
     const RowDescriptor& row_desc();
 
     RuntimeProfile* runtime_profile() { return _runtime_profile.get(); }
-    std::string debug_string() const;
+    virtual std::string debug_string() const;
     int32_t id() const { return _operator_builder->id(); }
 
 protected:
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index 16b210ddf7..f673ca1afa 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -39,8 +39,24 @@ bool ScanOperator::is_pending_finish() const {
     return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
 }
 
+Status ScanOperator::try_close() {
+    return _node->try_close();
+}
+
 bool ScanOperator::runtime_filters_are_ready_or_timeout() {
     return _node->runtime_filters_are_ready_or_timeout();
 }
 
+std::string ScanOperator::debug_string() const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
+                   SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
+    if (_node->_scanner_ctx) {
+        fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ",
+                       _node->_scanner_ctx->get_num_running_scanners(),
+                       _node->_scanner_ctx->get_num_scheduling_ctx());
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index f71a04af8a..a4810f9730 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -43,6 +43,10 @@ public:
     bool is_pending_finish() const override;
 
     bool runtime_filters_are_ready_or_timeout() override;
+
+    std::string debug_string() const override;
+
+    Status try_close() override;
 };
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 8aa033a759..1f7d12f15d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -194,6 +194,10 @@ Status PipelineTask::finalize() {
     return _sink->finalize(_state);
 }
 
+Status PipelineTask::try_close() {
+    return _source->try_close();
+}
+
 Status PipelineTask::close() {
     int64_t close_ns = 0;
     Status s;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 2e57cf6453..3a37778a48 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -110,6 +110,9 @@ public:
 
     Status execute(bool* eos);
 
+    // Try to close this pipeline task. If there are still some resources need to be released after `try_close`,
+    // this task will enter the `PENDING_FINISH` state.
+    Status try_close();
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
     Status close();
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 3ee4cb82d1..bb5323eccd 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -290,6 +290,7 @@ void TaskScheduler::_do_work(size_t index) {
 
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
     // state only should be CANCELED or FINISHED
+    task->try_close();
     if (task->is_pending_finish()) {
         task->set_state(PENDING_FINISH);
         _blocked_task_scheduler->add_blocked_task(task);
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index c6c6afe1e3..df5639efa4 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -114,6 +114,10 @@ public:
         _ctx_finish_cv.notify_one();
     }
 
+    const int get_num_running_scanners() const { return _num_running_scanners; }
+
+    const int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
+
     void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
 
     void clear_and_join();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 281a491827..7b00afb748 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -376,6 +376,15 @@ void VScanNode::release_resource(RuntimeState* state) {
     ExecNode::release_resource(state);
 }
 
+Status VScanNode::try_close() {
+    if (_scanner_ctx.get()) {
+        // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore
+        // TODO: there is a lock in `set_should_stop` may cause some slight impact
+        _scanner_ctx->set_should_stop();
+    }
+    return Status::OK();
+}
+
 Status VScanNode::_normalize_conjuncts() {
     // The conjuncts is always on output tuple, so use _output_tuple_desc;
     std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 1eba5c88e5..b8c237e66e 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -92,6 +92,8 @@ public:
     void release_resource(RuntimeState* state) override;
     bool runtime_filters_are_ready_or_timeout();
 
+    Status try_close();
+
     enum class PushDownType {
         // The predicate can not be pushed down to data source
         UNACCEPTABLE,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org