You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/15 11:21:48 UTC

[doris] branch master updated: [Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bfceb7acb [Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)
4bfceb7acb is described below

commit 4bfceb7acbb03ab06d1c13559be357cba5c70618
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 15 19:21:40 2023 +0800

    [Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)
    
    fix pipeline task call finish_p_dependency more than once
    
    When pipeline task meet eos->PENDING_FINISH->CANCELED, this task will call finish_p_dependency twice.
---
 be/src/pipeline/pipeline_task.cpp     |  7 +++++++
 be/src/pipeline/pipeline_task.h       | 12 ++++++------
 be/src/pipeline/task_scheduler.cpp    |  5 -----
 be/src/runtime/runtime_filter_mgr.cpp |  4 ++--
 4 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index b8ab119c34..7c3cc2abe7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -280,6 +280,8 @@ QueryContext* PipelineTask::query_context() {
 
 // The FSM see PipelineTaskState's comment
 void PipelineTask::set_state(PipelineTaskState state) {
+    DCHECK(_cur_state != PipelineTaskState::FINISHED);
+
     if (_cur_state == state) {
         return;
     }
@@ -301,6 +303,11 @@ void PipelineTask::set_state(PipelineTaskState state) {
             COUNTER_UPDATE(_block_by_sink_counts, 1);
         }
     }
+
+    if (state == PipelineTaskState::FINISHED) {
+        _finish_p_dependency();
+    }
+
     _cur_state = state;
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dbf42157de..9317a494da 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -163,12 +163,6 @@ public:
 
     Status finalize();
 
-    void finish_p_dependency() {
-        for (const auto& p : _pipeline->_parents) {
-            p.lock()->finish_one_dependency(_previous_schedule_id);
-        }
-    }
-
     PipelineFragmentContext* fragment_context() { return _fragment_context; }
 
     QueryContext* query_context();
@@ -215,6 +209,12 @@ public:
     int get_core_id() const { return this->_core_id; }
 
 private:
+    void _finish_p_dependency() {
+        for (const auto& p : _pipeline->_parents) {
+            p.lock()->finish_one_dependency(_previous_schedule_id);
+        }
+    }
+
     Status _open();
     void _init_profile();
     void _fresh_profile_counter();
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index d31c2b75e8..5a00ff364c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -297,7 +297,6 @@ void TaskScheduler::_do_work(size_t index) {
                                      "finalize fail:" + status.to_string());
                 _try_close_task(task, PipelineTaskState::CANCELED);
             } else {
-                task->finish_p_dependency();
                 _try_close_task(task, PipelineTaskState::FINISHED);
             }
             continue;
@@ -338,10 +337,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
             return;
         }
         task->set_state(state);
-        // TODO: rethink the logic
-        if (state == PipelineTaskState::CANCELED) {
-            task->finish_p_dependency();
-        }
         task->fragment_context()->close_a_pipeline();
     }
 }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 601b47a206..e00149a7fa 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -281,8 +281,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
         auto iter = _filter_map.find(std::to_string(request->filter_id()));
         VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
         if (iter == _filter_map.end()) {
-            LOG(WARNING) << "unknown filter id:" << std::to_string(request->filter_id());
-            return Status::InvalidArgument("unknown filter id");
+            return Status::InvalidArgument("unknown filter id {}",
+                                           std::to_string(request->filter_id()));
         }
         cntVal = iter->second;
         if (auto bf = cntVal->filter->get_bloomfilter()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org