You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/15 11:21:48 UTC
[doris] branch master updated: [Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bfceb7acb [Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)
4bfceb7acb is described below
commit 4bfceb7acbb03ab06d1c13559be357cba5c70618
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 15 19:21:40 2023 +0800
[Bug](pipeline) fix pipeline task call finish_p_dependency more than once (#20851)
fix pipeline task call finish_p_dependency more than once
When pipeline task meet eos->PENDING_FINISH->CANCELED, this task will call finish_p_dependency twice.
---
be/src/pipeline/pipeline_task.cpp | 7 +++++++
be/src/pipeline/pipeline_task.h | 12 ++++++------
be/src/pipeline/task_scheduler.cpp | 5 -----
be/src/runtime/runtime_filter_mgr.cpp | 4 ++--
4 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp
index b8ab119c34..7c3cc2abe7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -280,6 +280,8 @@ QueryContext* PipelineTask::query_context() {
// The FSM see PipelineTaskState's comment
void PipelineTask::set_state(PipelineTaskState state) {
+ DCHECK(_cur_state != PipelineTaskState::FINISHED);
+
if (_cur_state == state) {
return;
}
@@ -301,6 +303,11 @@ void PipelineTask::set_state(PipelineTaskState state) {
COUNTER_UPDATE(_block_by_sink_counts, 1);
}
}
+
+ if (state == PipelineTaskState::FINISHED) {
+ _finish_p_dependency();
+ }
+
_cur_state = state;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dbf42157de..9317a494da 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -163,12 +163,6 @@ public:
Status finalize();
- void finish_p_dependency() {
- for (const auto& p : _pipeline->_parents) {
- p.lock()->finish_one_dependency(_previous_schedule_id);
- }
- }
-
PipelineFragmentContext* fragment_context() { return _fragment_context; }
QueryContext* query_context();
@@ -215,6 +209,12 @@ public:
int get_core_id() const { return this->_core_id; }
private:
+ void _finish_p_dependency() {
+ for (const auto& p : _pipeline->_parents) {
+ p.lock()->finish_one_dependency(_previous_schedule_id);
+ }
+ }
+
Status _open();
void _init_profile();
void _fresh_profile_counter();
diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp
index d31c2b75e8..5a00ff364c 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -297,7 +297,6 @@ void TaskScheduler::_do_work(size_t index) {
"finalize fail:" + status.to_string());
_try_close_task(task, PipelineTaskState::CANCELED);
} else {
- task->finish_p_dependency();
_try_close_task(task, PipelineTaskState::FINISHED);
}
continue;
@@ -338,10 +337,6 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state)
return;
}
task->set_state(state);
- // TODO: rethink the logic
- if (state == PipelineTaskState::CANCELED) {
- task->finish_p_dependency();
- }
task->fragment_context()->close_a_pipeline();
}
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 601b47a206..e00149a7fa 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -281,8 +281,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
auto iter = _filter_map.find(std::to_string(request->filter_id()));
VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString();
if (iter == _filter_map.end()) {
- LOG(WARNING) << "unknown filter id:" << std::to_string(request->filter_id());
- return Status::InvalidArgument("unknown filter id");
+ return Status::InvalidArgument("unknown filter id {}",
+ std::to_string(request->filter_id()));
}
cntVal = iter->second;
if (auto bf = cntVal->filter->get_bloomfilter()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org