You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "Gabriel39 (via GitHub)" <gi...@apache.org> on 2024/04/07 09:42:22 UTC
[PR] [pipelineX](runtime filter) Fix task timeout cuased by runtime filter [doris]
Gabriel39 opened a new pull request, #33332:
URL: https://github.com/apache/doris/pull/33332
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
## Further comments
If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041786400
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555179463
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {
std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
- std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
- _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+ fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+ Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}
-bool RuntimeFilterTimer::has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && task) {
+ _add_block_task(task);
+ task->_blocked_dep = this;
+ }
+ return ready ? nullptr : this;
}
void RuntimeFilterTimer::call_timeout() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_ready) {
- return;
- }
- _call_timeout = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
void RuntimeFilterTimer::call_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_timeout) {
- return;
- }
- _call_ready = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
- _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- DCHECK(!_call_timeout);
- if (!_call_ready) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
- const auto filter_id = runtime_filter->filter_id();
- ;
- _filters++;
- _filter_ready_map[filter_id] = false;
- int64_t registration_time = runtime_filter->registration_time();
- int32 wait_time_ms = runtime_filter->wait_time_ms();
- auto filter_timer = std::make_shared<RuntimeFilterTimer>(
- filter_id, registration_time, wait_time_ms,
- std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
- runtime_filter->set_filter_timer(filter_timer);
- ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
Review Comment:
warning: method 'start' can be made static [readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:240:
```diff
- void start();
+ static void start();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555104027
##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+ Status rf_status = Status::OK();
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+ << _state->pipeline_x_task()->debug_string();
+ auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+ _state->pipeline_x_task());
+
+ bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+ bool ready = rf_dep == nullptr && !timeout;
+ if (!ready) {
+ runtime_filter->await();
Review Comment:
为什么会有这个情况发生?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554992498
##########
be/src/runtime/runtime_state.h:
##########
@@ -623,6 +624,12 @@ class RuntimeState {
int task_id() const { return _task_id; }
+ pipeline::PipelineXTask* pipeline_x_task() const;
+
+ void set_pipeline_x_task(pipeline::PipelineXTask* pipeline_x_task) {
Review Comment:
这个逻辑太hack了,runtime state 保存pipelinex task 不是很合理
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041829324
TeamCity be ut coverage result:
Function Coverage: 35.61% (8885/24948)
Line Coverage: 27.34% (72961/266864)
Region Coverage: 26.50% (37711/142297)
Branch Coverage: 23.32% (19219/82412)
Coverage Report: http://coverage.selectdb-in.cc/coverage/7f65d65cdb1e625094deee258e479d1bd6a942d6_7f65d65cdb1e625094deee258e479d1bd6a942d6/report/index.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554991558
##########
be/src/pipeline/exec/scan_operator.cpp:
##########
@@ -1407,7 +1400,11 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
return Status::OK();
}
COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time());
+ int64_t rf_time = 0;
+ for (auto& dep : _filter_dependencies) {
+ rf_time += dep->watcher_elapse_time();
+ }
+ COUNTER_UPDATE(exec_time_counter(), rf_time);
Review Comment:
感觉这个不应该是把多个rf的时间累加啊
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105480
##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+ Status rf_status = Status::OK();
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+ << _state->pipeline_x_task()->debug_string();
+ auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+ _state->pipeline_x_task());
+
+ bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+ bool ready = rf_dep == nullptr && !timeout;
+ if (!ready) {
+ runtime_filter->await();
+ }
+ if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+ // Runtime filter has been applied in open phase.
+ RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+ _runtime_filter_ctxs[i].apply_mark = true;
+ } else if (rf_dep != nullptr) {
+ // Runtime filter is neither ready nor timeout, so we should continue to wait RF.
+ return Status::WaitForRf("Runtime filters are neither not ready nor timeout");
Review Comment:
返回这个没意义了
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555200401
##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
return true;
}
+void IRuntimeFilter::update_state() {
+ DCHECK(is_consumer());
+ auto execution_timeout = _state->execution_timeout * 1000;
+ auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+ // bitmap filter is precise filter and only filter once, so it must be applied.
+ int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+ ? execution_timeout
+ : runtime_filter_wait_time_ms;
+ auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+ DCHECK(_enable_pipeline_exec);
+ // In pipelineX, runtime filters will be ready or timeout before open phase.
+ if (expected == RuntimeFilterState::NOT_READY) {
+ DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+ _rf_state_atomic = RuntimeFilterState::TIME_OUT;
Review Comment:
我们为啥要把not_Ready 状态更改为 TIME_OUT
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555202245
##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
return true;
}
+void IRuntimeFilter::update_state() {
+ DCHECK(is_consumer());
+ auto execution_timeout = _state->execution_timeout * 1000;
+ auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+ // bitmap filter is precise filter and only filter once, so it must be applied.
+ int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+ ? execution_timeout
+ : runtime_filter_wait_time_ms;
+ auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+ DCHECK(_enable_pipeline_exec);
+ // In pipelineX, runtime filters will be ready or timeout before open phase.
+ if (expected == RuntimeFilterState::NOT_READY) {
+ DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+ _rf_state_atomic = RuntimeFilterState::TIME_OUT;
Review Comment:
执行到这里的时候rf状态要么是timeout要么是ready,ready的话状态是已经被更新过的,所以这个地方是timeout。更新状态是因为要在profile显示
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041484058
<details>
<summary>TPC-H: <b>Total hot run time: 38698 ms</b></summary>
```
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 29f6c816a73363222f3ca335cdaa8a78aa2fff2b, data reload: false
------ Round 1 ----------------------------------
q1 17635 4097 4075 4075
q2 2015 194 188 188
q3 10474 1256 1395 1256
q4 10202 795 948 795
q5 7467 3032 2971 2971
q6 219 134 132 132
q7 1121 635 602 602
q8 9416 2013 2052 2013
q9 6834 6214 6164 6164
q10 8485 3545 3517 3517
q11 420 245 236 236
q12 392 212 210 210
q13 17788 2868 2919 2868
q14 268 239 240 239
q15 519 479 476 476
q16 503 386 373 373
q17 967 921 892 892
q18 7353 6485 6410 6410
q19 1608 1555 1549 1549
q20 542 300 296 296
q21 3621 3154 3134 3134
q22 351 302 306 302
Total cold run time: 108200 ms
Total hot run time: 38698 ms
----- Round 2, with runtime_filter_mode=off -----
q1 4068 4052 4106 4052
q2 328 222 220 220
q3 2986 2962 2934 2934
q4 1872 1857 1855 1855
q5 5234 5207 5202 5202
q6 207 127 124 124
q7 2256 1826 1791 1791
q8 3225 3316 3287 3287
q9 8447 8468 8482 8468
q10 3789 4016 4021 4016
q11 560 453 464 453
q12 740 599 634 599
q13 16790 3106 3100 3100
q14 300 279 261 261
q15 526 494 481 481
q16 495 450 470 450
q17 1766 1775 1738 1738
q18 8262 7667 7594 7594
q19 1890 1713 1674 1674
q20 2035 1831 1820 1820
q21 5343 4855 4957 4855
q22 500 444 440 440
Total cold run time: 71619 ms
Total hot run time: 55414 ms
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555106446
##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+ Status rf_status = Status::OK();
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+ << _state->pipeline_x_task()->debug_string();
+ auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+ _state->pipeline_x_task());
+
+ bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
Review Comment:
我们的dependency 应该只有ready 和 block 两个状态,尽量不要引入timeout这个状态了。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554886280
##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@ class RuntimeFilterConsumer {
bool runtime_filters_are_ready_or_timeout();
- void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+ void init_runtime_filter_dependency(
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name);
Review Comment:
warning: parameter 'id' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls]
```suggestion
int id, const int node_id, const std::string& name);
```
##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@ class RuntimeFilterConsumer {
bool runtime_filters_are_ready_or_timeout();
- void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+ void init_runtime_filter_dependency(
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name);
Review Comment:
warning: parameter 'node_id' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls]
```suggestion
const int id, int node_id, const std::string& name);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555104553
##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+ Status rf_status = Status::OK();
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+ << _state->pipeline_x_task()->debug_string();
+ auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+ _state->pipeline_x_task());
+
+ bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
Review Comment:
我感觉我们不需要区分runtime filter 是ready了,还是timeout了。
1. runtime filter consumer 只需要知道rf dependency ready了,然后就开始消费rf。
2. 处理ready还是timeout 会引入一些状态,我们判断容易有lock,还有状态先后的问题。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041828726
<details>
<summary>TPC-H: <b>Total hot run time: 38804 ms</b></summary>
```
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 7f65d65cdb1e625094deee258e479d1bd6a942d6, data reload: false
------ Round 1 ----------------------------------
q1 17606 4506 4442 4442
q2 1995 178 172 172
q3 10478 1183 1198 1183
q4 10186 742 776 742
q5 7502 2734 2689 2689
q6 212 133 137 133
q7 1042 602 596 596
q8 9222 2107 2062 2062
q9 7925 6552 6549 6549
q10 8639 3532 3535 3532
q11 460 241 246 241
q12 457 217 212 212
q13 18923 2943 2950 2943
q14 281 242 245 242
q15 518 473 483 473
q16 500 398 380 380
q17 973 637 664 637
q18 7402 6731 6784 6731
q19 7029 1476 1509 1476
q20 1068 314 305 305
q21 3454 2762 2773 2762
q22 359 302 309 302
Total cold run time: 116231 ms
Total hot run time: 38804 ms
----- Round 2, with runtime_filter_mode=off -----
q1 4404 4172 4191 4172
q2 482 270 257 257
q3 2999 2732 2788 2732
q4 1846 1567 1576 1567
q5 5296 5339 5320 5320
q6 209 122 124 122
q7 2279 1883 1883 1883
q8 3251 3359 3343 3343
q9 8658 8658 8578 8578
q10 3843 3732 3728 3728
q11 586 478 479 478
q12 743 595 582 582
q13 17652 2925 2943 2925
q14 305 275 269 269
q15 517 462 473 462
q16 488 438 421 421
q17 1812 1481 1526 1481
q18 8113 7980 7985 7980
q19 1708 1588 1573 1573
q20 2058 1853 1803 1803
q21 11210 4970 4908 4908
q22 554 471 460 460
Total cold run time: 79013 ms
Total hot run time: 55044 ms
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041878252
PR approved by at least one committer and no changes requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041878271
PR approved by anyone and no changes requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554993107
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,59 @@ std::string Dependency::debug_string(int indentation_level) {
std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
- std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
- _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+ fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+ Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}
-bool RuntimeFilterTimer::has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && task) {
+ _add_block_task(task);
+ task->_blocked_dep = this;
+ }
+ return ready ? nullptr : this;
}
void RuntimeFilterTimer::call_timeout() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_ready) {
- return;
- }
- _call_timeout = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_timeout();
+ _parent->set_ready();
}
void RuntimeFilterTimer::call_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_timeout) {
- return;
- }
- _call_ready = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
- _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- DCHECK(!_call_timeout);
- if (!_call_ready) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
- const auto filter_id = runtime_filter->filter_id();
- ;
- _filters++;
- _filter_ready_map[filter_id] = false;
- int64_t registration_time = runtime_filter->registration_time();
- int32 wait_time_ms = runtime_filter->wait_time_ms();
- auto filter_timer = std::make_shared<RuntimeFilterTimer>(
- filter_id, registration_time, wait_time_ms,
- std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
- runtime_filter->set_filter_timer(filter_timer);
- ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+ while (!_stop) {
+ std::unique_lock<std::mutex> lk(cv_m);
-void RuntimeFilterDependency::sub_filters(int id) {
- std::vector<PipelineXTask*> local_block_task {};
- {
- std::lock_guard<std::mutex> lk(_task_lock);
- if (!_filter_ready_map[id]) {
- _filter_ready_map[id] = true;
- _filters--;
+ cv.wait(lk, [this] { return !_que.empty() || _stop; });
Review Comment:
wait at most 3s, and then wake up to check if it is stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555103860
##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+ Status rf_status = Status::OK();
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+ << _state->pipeline_x_task()->debug_string();
+ auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+ _state->pipeline_x_task());
+
+ bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+ bool ready = rf_dep == nullptr && !timeout;
+ if (!ready) {
+ runtime_filter->await();
Review Comment:
为什么会有这个情况发生?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041483584
TeamCity be ut coverage result:
Function Coverage: 35.64% (8887/24933)
Line Coverage: 27.38% (72982/266550)
Region Coverage: 26.54% (37716/142116)
Branch Coverage: 23.35% (19224/82330)
Coverage Report: http://coverage.selectdb-in.cc/coverage/29f6c816a73363222f3ca335cdaa8a78aa2fff2b_29f6c816a73363222f3ca335cdaa8a78aa2fff2b/report/index.html
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041465879
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555178568
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {
std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
- std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
- _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+ fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+ Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}
-bool RuntimeFilterTimer::has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && task) {
+ _add_block_task(task);
+ task->_blocked_dep = this;
+ }
+ return ready ? nullptr : this;
}
void RuntimeFilterTimer::call_timeout() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_ready) {
- return;
- }
- _call_timeout = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
void RuntimeFilterTimer::call_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_timeout) {
- return;
- }
- _call_ready = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
- _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- DCHECK(!_call_timeout);
- if (!_call_ready) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
- const auto filter_id = runtime_filter->filter_id();
- ;
- _filters++;
- _filter_ready_map[filter_id] = false;
- int64_t registration_time = runtime_filter->registration_time();
- int32 wait_time_ms = runtime_filter->wait_time_ms();
- auto filter_timer = std::make_shared<RuntimeFilterTimer>(
- filter_id, registration_time, wait_time_ms,
- std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
- runtime_filter->set_filter_timer(filter_timer);
- ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+ while (!_stop) {
Review Comment:
warning: method 'start' can be made static [readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:240:
```diff
- void start();
+ static void start();
```
##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -190,6 +191,17 @@ class PipelineXTask : public PipelineTask {
return nullptr;
}
+ Dependency* _runtime_filter_blocked_dependency() {
Review Comment:
warning: method '_runtime_filter_blocked_dependency' can be made static [readability-convert-member-functions-to-static]
```suggestion
static Dependency* _runtime_filter_blocked_dependency() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei merged PR #33332:
URL: https://github.com/apache/doris/pull/33332
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout cuased by runtime filter [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041394297
Thank you for your contribution to Apache Doris.
Don't know what should be done next? See [How to process your PR](https://cwiki.apache.org/confluence/display/DORIS/How+to+process+your+PR)
Since 2024-03-18, the Document has been moved to [doris-website](https://github.com/apache/doris-website).
See [Doris Document](https://cwiki.apache.org/confluence/display/DORIS/Doris+Document).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]
Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105895
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -209,75 +209,36 @@ struct FinishDependency final : public Dependency {
};
class RuntimeFilterDependency;
+struct RuntimeFilterTimerQueue;
class RuntimeFilterTimer {
public:
- RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms,
+ RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
std::shared_ptr<RuntimeFilterDependency> parent)
- : _filter_id(filter_id),
- _parent(std::move(parent)),
+ : _parent(std::move(parent)),
_registration_time(registration_time),
_wait_time_ms(wait_time_ms) {}
+ // Called by runtime filter producer.
void call_ready();
+ // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout.
void call_timeout();
- void call_has_ready();
-
- // When the use count is equal to 1, only the timer queue still holds ownership,
- // so there is no need to take any action.
- void call_has_release() {};
-
- bool has_ready();
-
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }
private:
- int _filter_id = -1;
- bool _call_ready {};
- bool _call_timeout {};
- std::shared_ptr<RuntimeFilterDependency> _parent;
+ friend struct RuntimeFilterTimerQueue;
+ std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
std::mutex _lock;
const int64_t _registration_time;
const int32_t _wait_time_ms;
- bool _is_ready = false;
};
struct RuntimeFilterTimerQueue {
constexpr static int64_t interval = 10;
void run() { _thread.detach(); }
- void start() {
- while (!_stop) {
- std::unique_lock<std::mutex> lk(cv_m);
-
- cv.wait(lk, [this] { return !_que.empty() || _stop; });
- if (_stop) {
- break;
- }
- {
- std::unique_lock<std::mutex> lc(_que_lock);
- std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
- for (auto& it : _que) {
- if (it.use_count() == 1) {
- it->call_has_release();
- } else if (it->has_ready()) {
- it->call_has_ready();
- } else {
- int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
- if (ms_since_registration > it->wait_time_ms()) {
- it->call_timeout();
- } else {
- new_que.push_back(std::move(it));
- }
- }
- }
- new_que.swap(_que);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(interval));
- }
- _shutdown = true;
- }
Review Comment:
把下面的wait for shutdown的逻辑,移动到stop 里面,stop 应该是同步的,否则容易挂。不要在析构函数里等待。那个时候可能就晚了。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org