You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/12 10:23:26 UTC
[doris] branch master updated: [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd9f58bd3 [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)
5fd9f58bd3 is described below
commit 5fd9f58bd3eca510b825d5dfc92927ffa3f31dc0
Author: Pxl <px...@qq.com>
AuthorDate: Mon Jun 12 18:23:19 2023 +0800
[Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)
adjus queryt canceled log on pipeline engine
---
be/src/pipeline/exec/operator.cpp | 6 +++---
be/src/pipeline/exec/union_source_operator.cpp | 5 ++---
be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++++++++---
be/src/pipeline/pipeline_task.cpp | 4 ++++
be/src/pipeline/task_scheduler.cpp | 15 ++-------------
5 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 3fd4e34eba..40da74ffb0 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -49,9 +49,9 @@ const RowDescriptor& OperatorBase::row_desc() {
std::string OperatorBase::debug_string() const {
std::stringstream ss;
- ss << _operator_builder->get_name() << ", is source: " << is_source();
- ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed;
- ss << ", is pending finish: " << is_pending_finish();
+ ss << _operator_builder->get_name() << ", is_source: " << is_source();
+ ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed;
+ ss << ", is_pending_finish: " << is_pending_finish();
return ss.str();
}
diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp
index 7311d849c8..6efd0bfc78 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -83,10 +83,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl
std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)));
//have exectue const expr, queue have no data any more, and child could be colsed
- if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
- _data_queue->is_all_finish())) {
+ if (eos || (!can_read() && _data_queue->is_all_finish())) {
source_state = SourceState::FINISHED;
- } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) {
+ } else if (can_read()) {
source_state = SourceState::MORE_DATA;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 1766e9aa2e..9ecea24228 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -152,6 +152,12 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
_exec_status = Status::Cancelled(msg);
}
_runtime_state->set_is_cancelled(true);
+
+ LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
+ for (auto& task : _tasks) {
+ LOG(WARNING) << task->debug_string();
+ }
+
_runtime_state->set_process_status(_exec_status);
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
@@ -321,6 +327,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
+ _total_tasks = 0;
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
@@ -329,8 +336,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
Operators operators;
RETURN_IF_ERROR(pipeline->build_operators(operators));
- auto task = std::make_unique<PipelineTask>(pipeline, 0, _runtime_state.get(), operators,
- sink, this, pipeline->pipeline_profile());
+ auto task =
+ std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(),
+ operators, sink, this, pipeline->pipeline_profile());
sink->set_child(task->get_root());
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
@@ -339,7 +347,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
for (auto& task : _tasks) {
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
}
- _total_tasks = _tasks.size();
// register the profile of child data stream sender
for (auto& sender : _multi_cast_stream_sink_senders) {
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 7c2379796a..b8ab119c34 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -310,6 +310,10 @@ std::string PipelineTask::debug_string() {
_fresh_profile_counter();
_task_profile->pretty_print(&profile_ss, "");
+ fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id));
+ fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
+ print_id(fragment_context()->get_fragment_instance_id()));
+
fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index,
get_state_name(_cur_state));
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 831c5e2546..f5c55ccbdd 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,15 +118,6 @@ void BlockedTaskScheduler::_schedule() {
PipelineTaskState::PENDING_FINISH);
}
} else if (task->fragment_context()->is_canceled()) {
- std::string task_ds;
-#ifndef NDEBUG
- task_ds = task->debug_string();
-#endif
- LOG(WARNING) << "Canceled, query_id=" << print_id(task->query_context()->query_id)
- << ", instance_id="
- << print_id(task->fragment_context()->get_fragment_instance_id())
- << (task_ds.empty() ? "" : task_ds);
-
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
iter++;
@@ -213,8 +204,6 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
ready_tasks.emplace_back(task);
}
-///////////////////////// TaskScheduler ///////////////////////////////////////////////////////////////////////////
-
TaskScheduler::~TaskScheduler() {
shutdown();
}
@@ -289,8 +278,8 @@ void TaskScheduler::_do_work(size_t index) {
task->set_previous_core_id(index);
if (!status.ok()) {
- LOG(WARNING) << fmt::format("Pipeline task [{}] failed: {}", task->debug_string(),
- status.to_string());
+ LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}, task: \n{}",
+ status.to_string(), task->debug_string());
// exec failed,cancel all fragment instance
fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string());
fragment_ctx->send_report(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org