You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/11/16 11:48:08 UTC

(doris) branch branch-2.0 updated: [Bug](pipeline) try fix the exchange sink buffer result error (#27087)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 72b74090d34 [Bug](pipeline) try fix the exchange sink buffer result error (#27087)
72b74090d34 is described below

commit 72b74090d34cb2bad0023a74836795c5b46aca52
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Nov 16 19:47:58 2023 +0800

    [Bug](pipeline) try fix the exchange sink buffer result error (#27087)
---
 be/src/pipeline/task_scheduler.cpp | 28 ++++++++++++++++------------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index 4298efd01db..00b27fc59bf 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -325,24 +325,28 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
         return;
     }
     auto status = task->try_close();
-    if (!status.ok() && state != PipelineTaskState::CANCELED) {
-        // Call `close` if `try_close` failed to make sure allocated resources are released
-        task->close();
+    auto cancel = [&]() {
         task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                          status.to_string());
         state = PipelineTaskState::CANCELED;
-    } else if (task->is_pending_finish()) {
-        task->set_state(PipelineTaskState::PENDING_FINISH);
-        _blocked_task_scheduler->add_blocked_task(task);
-        return;
-    } else {
+    };
+
+    auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED;
+    if (try_close_failed) {
+        cancel();
+        // Call `close` if `try_close` failed to make sure allocated resources are released
+        static_cast<void>(task->close());
+    } else if (!task->is_pending_finish()) {
         status = task->close();
         if (!status.ok() && state != PipelineTaskState::CANCELED) {
-            task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
-                                             status.to_string());
-            state = PipelineTaskState::CANCELED;
+            cancel();
         }
-        DCHECK(!task->is_pending_finish()) << task->debug_string();
+    }
+
+    if (task->is_pending_finish()) {
+        task->set_state(PipelineTaskState::PENDING_FINISH);
+        static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
+        return;
     }
     task->set_state(state);
     task->set_close_pipeline_time();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org