You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/12 10:23:26 UTC

[doris] branch master updated: [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd9f58bd3 [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)
5fd9f58bd3 is described below

commit 5fd9f58bd3eca510b825d5dfc92927ffa3f31dc0
Author: Pxl <px...@qq.com>
AuthorDate: Mon Jun 12 18:23:19 2023 +0800

    [Chore](pipeline-engine) adjus queryt canceled log on pipeline engine (#20702)
    
    adjus queryt canceled log on pipeline engine
---
 be/src/pipeline/exec/operator.cpp              |  6 +++---
 be/src/pipeline/exec/union_source_operator.cpp |  5 ++---
 be/src/pipeline/pipeline_fragment_context.cpp  | 13 ++++++++++---
 be/src/pipeline/pipeline_task.cpp              |  4 ++++
 be/src/pipeline/task_scheduler.cpp             | 15 ++-------------
 5 files changed, 21 insertions(+), 22 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp
index 3fd4e34eba..40da74ffb0 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -49,9 +49,9 @@ const RowDescriptor& OperatorBase::row_desc() {
 
 std::string OperatorBase::debug_string() const {
     std::stringstream ss;
-    ss << _operator_builder->get_name() << ", is source: " << is_source();
-    ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed;
-    ss << ", is pending finish: " << is_pending_finish();
+    ss << _operator_builder->get_name() << ", is_source: " << is_source();
+    ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed;
+    ss << ", is_pending_finish: " << is_pending_finish();
     return ss.str();
 }
 
diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp
index 7311d849c8..6efd0bfc78 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -83,10 +83,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl
             std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1,
                       std::placeholders::_2, std::placeholders::_3)));
     //have exectue const expr, queue have no data any more, and child could be colsed
-    if (eos || (!_need_read_for_const_expr && !_data_queue->remaining_has_data() &&
-                _data_queue->is_all_finish())) {
+    if (eos || (!can_read() && _data_queue->is_all_finish())) {
         source_state = SourceState::FINISHED;
-    } else if (_need_read_for_const_expr || _data_queue->remaining_has_data()) {
+    } else if (can_read()) {
         source_state = SourceState::MORE_DATA;
     } else {
         source_state = SourceState::DEPEND_ON_SOURCE;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 1766e9aa2e..9ecea24228 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -152,6 +152,12 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
             _exec_status = Status::Cancelled(msg);
         }
         _runtime_state->set_is_cancelled(true);
+
+        LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg;
+        for (auto& task : _tasks) {
+            LOG(WARNING) << task->debug_string();
+        }
+
         _runtime_state->set_process_status(_exec_status);
         // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
         // For stream load the fragment's query_id == load id, it is set in FE.
@@ -321,6 +327,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
 
 Status PipelineFragmentContext::_build_pipeline_tasks(
         const doris::TPipelineFragmentParams& request) {
+    _total_tasks = 0;
     for (PipelinePtr& pipeline : _pipelines) {
         // if sink
         auto sink = pipeline->sink()->build_operator();
@@ -329,8 +336,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
 
         Operators operators;
         RETURN_IF_ERROR(pipeline->build_operators(operators));
-        auto task = std::make_unique<PipelineTask>(pipeline, 0, _runtime_state.get(), operators,
-                                                   sink, this, pipeline->pipeline_profile());
+        auto task =
+                std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(),
+                                               operators, sink, this, pipeline->pipeline_profile());
         sink->set_child(task->get_root());
         _tasks.emplace_back(std::move(task));
         _runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
@@ -339,7 +347,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     for (auto& task : _tasks) {
         RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
     }
-    _total_tasks = _tasks.size();
 
     // register the profile of child data stream sender
     for (auto& sender : _multi_cast_stream_sink_senders) {
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index 7c2379796a..b8ab119c34 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -310,6 +310,10 @@ std::string PipelineTask::debug_string() {
     _fresh_profile_counter();
     _task_profile->pretty_print(&profile_ss, "");
 
+    fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id));
+    fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
+                   print_id(fragment_context()->get_fragment_instance_id()));
+
     fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
     fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index,
                    get_state_name(_cur_state));
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 831c5e2546..f5c55ccbdd 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -118,15 +118,6 @@ void BlockedTaskScheduler::_schedule() {
                                    PipelineTaskState::PENDING_FINISH);
                 }
             } else if (task->fragment_context()->is_canceled()) {
-                std::string task_ds;
-#ifndef NDEBUG
-                task_ds = task->debug_string();
-#endif
-                LOG(WARNING) << "Canceled, query_id=" << print_id(task->query_context()->query_id)
-                             << ", instance_id="
-                             << print_id(task->fragment_context()->get_fragment_instance_id())
-                             << (task_ds.empty() ? "" : task_ds);
-
                 if (task->is_pending_finish()) {
                     task->set_state(PipelineTaskState::PENDING_FINISH);
                     iter++;
@@ -213,8 +204,6 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
     ready_tasks.emplace_back(task);
 }
 
-/////////////////////////  TaskScheduler  ///////////////////////////////////////////////////////////////////////////
-
 TaskScheduler::~TaskScheduler() {
     shutdown();
 }
@@ -289,8 +278,8 @@ void TaskScheduler::_do_work(size_t index) {
 
         task->set_previous_core_id(index);
         if (!status.ok()) {
-            LOG(WARNING) << fmt::format("Pipeline task [{}] failed: {}", task->debug_string(),
-                                        status.to_string());
+            LOG(WARNING) << fmt::format("Pipeline task failed. reason: {}, task: \n{}",
+                                        status.to_string(), task->debug_string());
             // exec failed,cancel all fragment instance
             fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string());
             fragment_ctx->send_report(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org