You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/01/17 04:01:26 UTC
[doris] branch master updated: [Bug](pipeline) Fix DCHECK failure (#15928)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7d34512501 [Bug](pipeline) Fix DCHECK failure (#15928)
7d34512501 is described below
commit 7d34512501a2867cfcc012a6054e3422d88ffe92
Author: Gabriel <ga...@gmail.com>
AuthorDate: Tue Jan 17 12:01:20 2023 +0800
[Bug](pipeline) Fix DCHECK failure (#15928)
---
be/src/pipeline/exec/operator.cpp | 4 ++--
be/src/pipeline/exec/operator.h | 4 +++-
be/src/pipeline/exec/scan_operator.cpp | 16 ++++++++++++++++
be/src/pipeline/exec/scan_operator.h | 4 ++++
be/src/pipeline/pipeline_task.cpp | 4 ++++
be/src/pipeline/pipeline_task.h | 3 +++
be/src/pipeline/task_scheduler.cpp | 1 +
be/src/vec/exec/scan/scanner_context.h | 4 ++++
be/src/vec/exec/scan/vscan_node.cpp | 9 +++++++++
be/src/vec/exec/scan/vscan_node.h | 2 ++
10 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 3b78b59e7c..39cb8b8814 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -44,8 +44,8 @@ const RowDescriptor& OperatorBase::row_desc() {
std::string OperatorBase::debug_string() const {
std::stringstream ss;
- ss << _operator_builder->get_name() << ", source: " << is_source();
- ss << ", sink: " << is_sink() << ", is closed: " << _is_closed;
+ ss << _operator_builder->get_name() << ", is source: " << is_source();
+ ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed;
ss << ", is pending finish: " << is_pending_finish();
return ss.str();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index e2fde2d9e5..32ede3618a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -216,6 +216,8 @@ public:
*/
virtual bool is_pending_finish() const { return false; }
+ virtual Status try_close() { return Status::OK(); }
+
bool is_closed() const { return _is_closed; }
MemTracker* mem_tracker() const { return _mem_tracker.get(); }
@@ -225,7 +227,7 @@ public:
const RowDescriptor& row_desc();
RuntimeProfile* runtime_profile() { return _runtime_profile.get(); }
- std::string debug_string() const;
+ virtual std::string debug_string() const;
int32_t id() const { return _operator_builder->id(); }
protected:
diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp
index 16b210ddf7..f673ca1afa 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -39,8 +39,24 @@ bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}
+Status ScanOperator::try_close() {
+ return _node->try_close();
+}
+
bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}
+std::string ScanOperator::debug_string() const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
+ SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
+ if (_node->_scanner_ctx) {
+ fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ",
+ _node->_scanner_ctx->get_num_running_scanners(),
+ _node->_scanner_ctx->get_num_scheduling_ctx());
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h
index f71a04af8a..a4810f9730 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -43,6 +43,10 @@ public:
bool is_pending_finish() const override;
bool runtime_filters_are_ready_or_timeout() override;
+
+ std::string debug_string() const override;
+
+ Status try_close() override;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 8aa033a759..1f7d12f15d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -194,6 +194,10 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}
+Status PipelineTask::try_close() {
+ return _source->try_close();
+}
+
Status PipelineTask::close() {
int64_t close_ns = 0;
Status s;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 2e57cf6453..3a37778a48 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -110,6 +110,9 @@ public:
Status execute(bool* eos);
+ // Try to close this pipeline task. If there are still some resources need to be released after `try_close`,
+ // this task will enter the `PENDING_FINISH` state.
+ Status try_close();
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close();
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 3ee4cb82d1..bb5323eccd 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -290,6 +290,7 @@ void TaskScheduler::_do_work(size_t index) {
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
// state only should be CANCELED or FINISHED
+ task->try_close();
if (task->is_pending_finish()) {
task->set_state(PENDING_FINISH);
_blocked_task_scheduler->add_blocked_task(task);
diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h
index c6c6afe1e3..df5639efa4 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -114,6 +114,10 @@ public:
_ctx_finish_cv.notify_one();
}
+ const int get_num_running_scanners() const { return _num_running_scanners; }
+
+ const int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
+
void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
void clear_and_join();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index 281a491827..7b00afb748 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -376,6 +376,15 @@ void VScanNode::release_resource(RuntimeState* state) {
ExecNode::release_resource(state);
}
+Status VScanNode::try_close() {
+ if (_scanner_ctx.get()) {
+ // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore
+ // TODO: there is a lock in `set_should_stop` may cause some slight impact
+ _scanner_ctx->set_should_stop();
+ }
+ return Status::OK();
+}
+
Status VScanNode::_normalize_conjuncts() {
// The conjuncts is always on output tuple, so use _output_tuple_desc;
std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index 1eba5c88e5..b8c237e66e 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -92,6 +92,8 @@ public:
void release_resource(RuntimeState* state) override;
bool runtime_filters_are_ready_or_timeout();
+ Status try_close();
+
enum class PushDownType {
// The predicate can not be pushed down to data source
UNACCEPTABLE,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org