You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "Gabriel39 (via GitHub)" <gi...@apache.org> on 2024/04/07 09:42:22 UTC

[PR] [pipelineX](runtime filter) Fix task timeout cuased by runtime filter [doris]

Gabriel39 opened a new pull request, #33332:
URL: https://github.com/apache/doris/pull/33332

   ## Proposed changes
   
   Issue Number: close #xxx
   
   <!--Describe your changes.-->
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041786400

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555179463


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {

Review Comment:
   warning: method 'start' can be made static [readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:240:
   ```diff
   -     void start();
   +     static void start();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555104027


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+            bool ready = rf_dep == nullptr && !timeout;
+            if (!ready) {
+                runtime_filter->await();

Review Comment:
   为什么会有这个情况发生?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554992498


##########
be/src/runtime/runtime_state.h:
##########
@@ -623,6 +624,12 @@ class RuntimeState {
 
     int task_id() const { return _task_id; }
 
+    pipeline::PipelineXTask* pipeline_x_task() const;
+
+    void set_pipeline_x_task(pipeline::PipelineXTask* pipeline_x_task) {

Review Comment:
   这个逻辑太hack了,runtime state 保存pipelinex task 不是很合理



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041829324

   TeamCity be ut coverage result:
    Function Coverage: 35.61% (8885/24948) 
    Line Coverage: 27.34% (72961/266864)
    Region Coverage: 26.50% (37711/142297)
    Branch Coverage: 23.32% (19219/82412)
    Coverage Report: http://coverage.selectdb-in.cc/coverage/7f65d65cdb1e625094deee258e479d1bd6a942d6_7f65d65cdb1e625094deee258e479d1bd6a942d6/report/index.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554991558


##########
be/src/pipeline/exec/scan_operator.cpp:
##########
@@ -1407,7 +1400,11 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
         return Status::OK();
     }
     COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time());
-    COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time());
+    int64_t rf_time = 0;
+    for (auto& dep : _filter_dependencies) {
+        rf_time += dep->watcher_elapse_time();
+    }
+    COUNTER_UPDATE(exec_time_counter(), rf_time);

Review Comment:
   感觉这个不应该是把多个rf的时间累加啊



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105480


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+            bool ready = rf_dep == nullptr && !timeout;
+            if (!ready) {
+                runtime_filter->await();
+            }
+            if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+                // Runtime filter has been applied in open phase.
+                RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+                _runtime_filter_ctxs[i].apply_mark = true;
+            } else if (rf_dep != nullptr) {
+                // Runtime filter is neither ready nor timeout, so we should continue to wait RF.
+                return Status::WaitForRf("Runtime filters are neither not ready nor timeout");

Review Comment:
   返回这个没意义了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555200401


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
     return true;
 }
 
+void IRuntimeFilter::update_state() {
+    DCHECK(is_consumer());
+    auto execution_timeout = _state->execution_timeout * 1000;
+    auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? execution_timeout
+                                    : runtime_filter_wait_time_ms;
+    auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+    DCHECK(_enable_pipeline_exec);
+    // In pipelineX, runtime filters will be ready or timeout before open phase.
+    if (expected == RuntimeFilterState::NOT_READY) {
+        DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+        _rf_state_atomic = RuntimeFilterState::TIME_OUT;

Review Comment:
   我们为啥要把not_Ready 状态更改为 TIME_OUT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555202245


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
     return true;
 }
 
+void IRuntimeFilter::update_state() {
+    DCHECK(is_consumer());
+    auto execution_timeout = _state->execution_timeout * 1000;
+    auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? execution_timeout
+                                    : runtime_filter_wait_time_ms;
+    auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+    DCHECK(_enable_pipeline_exec);
+    // In pipelineX, runtime filters will be ready or timeout before open phase.
+    if (expected == RuntimeFilterState::NOT_READY) {
+        DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+        _rf_state_atomic = RuntimeFilterState::TIME_OUT;

Review Comment:
   执行到这里的时候rf状态要么是timeout要么是ready,ready的话状态是已经被更新过的,所以这个地方是timeout。更新状态是因为要在profile显示
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041484058

   
   <details>
   <summary>TPC-H: <b>Total hot run time: 38698 ms</b></summary>
   
   ```
   machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
   Tpch sf100 test result on commit 29f6c816a73363222f3ca335cdaa8a78aa2fff2b, data reload: false
   
   ------ Round 1 ----------------------------------
   q1	17635	4097	4075	4075
   q2	2015	194	188	188
   q3	10474	1256	1395	1256
   q4	10202	795	948	795
   q5	7467	3032	2971	2971
   q6	219	134	132	132
   q7	1121	635	602	602
   q8	9416	2013	2052	2013
   q9	6834	6214	6164	6164
   q10	8485	3545	3517	3517
   q11	420	245	236	236
   q12	392	212	210	210
   q13	17788	2868	2919	2868
   q14	268	239	240	239
   q15	519	479	476	476
   q16	503	386	373	373
   q17	967	921	892	892
   q18	7353	6485	6410	6410
   q19	1608	1555	1549	1549
   q20	542	300	296	296
   q21	3621	3154	3134	3134
   q22	351	302	306	302
   Total cold run time: 108200 ms
   Total hot run time: 38698 ms
   
   ----- Round 2, with runtime_filter_mode=off -----
   q1	4068	4052	4106	4052
   q2	328	222	220	220
   q3	2986	2962	2934	2934
   q4	1872	1857	1855	1855
   q5	5234	5207	5202	5202
   q6	207	127	124	124
   q7	2256	1826	1791	1791
   q8	3225	3316	3287	3287
   q9	8447	8468	8482	8468
   q10	3789	4016	4021	4016
   q11	560	453	464	453
   q12	740	599	634	599
   q13	16790	3106	3100	3100
   q14	300	279	261	261
   q15	526	494	481	481
   q16	495	450	470	450
   q17	1766	1775	1738	1738
   q18	8262	7667	7594	7594
   q19	1890	1713	1674	1674
   q20	2035	1831	1820	1820
   q21	5343	4855	4957	4855
   q22	500	444	440	440
   Total cold run time: 71619 ms
   Total hot run time: 55414 ms
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555106446


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();

Review Comment:
   我们的dependency 应该只有ready 和 block 两个状态,尽量不要引入timeout这个状态了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554886280


##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@ class RuntimeFilterConsumer {
 
     bool runtime_filters_are_ready_or_timeout();
 
-    void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+    void init_runtime_filter_dependency(
+            std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                    runtime_filter_dependencies,
+            const int id, const int node_id, const std::string& name);

Review Comment:
   warning: parameter 'id' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
               int id, const int node_id, const std::string& name);
   ```
   



##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@ class RuntimeFilterConsumer {
 
     bool runtime_filters_are_ready_or_timeout();
 
-    void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+    void init_runtime_filter_dependency(
+            std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                    runtime_filter_dependencies,
+            const int id, const int node_id, const std::string& name);

Review Comment:
   warning: parameter 'node_id' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls]
   
   ```suggestion
               const int id, int node_id, const std::string& name);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555104553


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();

Review Comment:
   我感觉我们不需要区分runtime filter 是ready了,还是timeout了。
   1. runtime filter consumer 只需要知道rf dependency ready了,然后就开始消费rf。
   2. 处理ready还是timeout 会引入一些状态,我们判断容易有lock,还有状态先后的问题。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041828726

   
   <details>
   <summary>TPC-H: <b>Total hot run time: 38804 ms</b></summary>
   
   ```
   machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
   Tpch sf100 test result on commit 7f65d65cdb1e625094deee258e479d1bd6a942d6, data reload: false
   
   ------ Round 1 ----------------------------------
   q1	17606	4506	4442	4442
   q2	1995	178	172	172
   q3	10478	1183	1198	1183
   q4	10186	742	776	742
   q5	7502	2734	2689	2689
   q6	212	133	137	133
   q7	1042	602	596	596
   q8	9222	2107	2062	2062
   q9	7925	6552	6549	6549
   q10	8639	3532	3535	3532
   q11	460	241	246	241
   q12	457	217	212	212
   q13	18923	2943	2950	2943
   q14	281	242	245	242
   q15	518	473	483	473
   q16	500	398	380	380
   q17	973	637	664	637
   q18	7402	6731	6784	6731
   q19	7029	1476	1509	1476
   q20	1068	314	305	305
   q21	3454	2762	2773	2762
   q22	359	302	309	302
   Total cold run time: 116231 ms
   Total hot run time: 38804 ms
   
   ----- Round 2, with runtime_filter_mode=off -----
   q1	4404	4172	4191	4172
   q2	482	270	257	257
   q3	2999	2732	2788	2732
   q4	1846	1567	1576	1567
   q5	5296	5339	5320	5320
   q6	209	122	124	122
   q7	2279	1883	1883	1883
   q8	3251	3359	3343	3343
   q9	8658	8658	8578	8578
   q10	3843	3732	3728	3728
   q11	586	478	479	478
   q12	743	595	582	582
   q13	17652	2925	2943	2925
   q14	305	275	269	269
   q15	517	462	473	462
   q16	488	438	421	421
   q17	1812	1481	1526	1481
   q18	8113	7980	7985	7980
   q19	1708	1588	1573	1573
   q20	2058	1853	1803	1803
   q21	11210	4970	4908	4908
   q22	554	471	460	460
   Total cold run time: 79013 ms
   Total hot run time: 55044 ms
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041878252

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041878271

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554993107


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,59 @@ std::string Dependency::debug_string(int indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_timeout();
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+    while (!_stop) {
+        std::unique_lock<std::mutex> lk(cv_m);
 
-void RuntimeFilterDependency::sub_filters(int id) {
-    std::vector<PipelineXTask*> local_block_task {};
-    {
-        std::lock_guard<std::mutex> lk(_task_lock);
-        if (!_filter_ready_map[id]) {
-            _filter_ready_map[id] = true;
-            _filters--;
+        cv.wait(lk, [this] { return !_que.empty() || _stop; });

Review Comment:
   wait at most 3s, and then wake up to check if it is stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555103860


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+            bool ready = rf_dep == nullptr && !timeout;
+            if (!ready) {
+                runtime_filter->await();

Review Comment:
   为什么会有这个情况发生?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041483584

   TeamCity be ut coverage result:
    Function Coverage: 35.64% (8887/24933) 
    Line Coverage: 27.38% (72982/266550)
    Region Coverage: 26.54% (37716/142116)
    Branch Coverage: 23.35% (19224/82330)
    Coverage Report: http://coverage.selectdb-in.cc/coverage/29f6c816a73363222f3ca335cdaa8a78aa2fff2b_29f6c816a73363222f3ca335cdaa8a78aa2fff2b/report/index.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "Gabriel39 (via GitHub)" <gi...@apache.org>.
Gabriel39 commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041465879

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555178568


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+    while (!_stop) {

Review Comment:
   warning: method 'start' can be made static [readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:240:
   ```diff
   -     void start();
   +     static void start();
   ```
   



##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -190,6 +191,17 @@ class PipelineXTask : public PipelineTask {
         return nullptr;
     }
 
+    Dependency* _runtime_filter_blocked_dependency() {

Review Comment:
   warning: method '_runtime_filter_blocked_dependency' can be made static [readability-convert-member-functions-to-static]
   
   ```suggestion
       static Dependency* _runtime_filter_blocked_dependency() {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei merged PR #33332:
URL: https://github.com/apache/doris/pull/33332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout cuased by runtime filter [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #33332:
URL: https://github.com/apache/doris/pull/33332#issuecomment-2041394297

   Thank you for your contribution to Apache Doris.
   Don't know what should be done next? See [How to process your PR](https://cwiki.apache.org/confluence/display/DORIS/How+to+process+your+PR)
   
   Since 2024-03-18, the Document has been moved to [doris-website](https://github.com/apache/doris-website).
   See [Doris Document](https://cwiki.apache.org/confluence/display/DORIS/Doris+Document).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [pipelineX](runtime filter) Fix task timeout caused by runtime filter [doris]

Posted by "yiguolei (via GitHub)" <gi...@apache.org>.
yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105895


##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -209,75 +209,36 @@ struct FinishDependency final : public Dependency {
 };
 
 class RuntimeFilterDependency;
+struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
 public:
-    RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms,
+    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
                        std::shared_ptr<RuntimeFilterDependency> parent)
-            : _filter_id(filter_id),
-              _parent(std::move(parent)),
+            : _parent(std::move(parent)),
               _registration_time(registration_time),
               _wait_time_ms(wait_time_ms) {}
 
+    // Called by runtime filter producer.
     void call_ready();
 
+    // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout.
     void call_timeout();
 
-    void call_has_ready();
-
-    // When the use count is equal to 1, only the timer queue still holds ownership,
-    // so there is no need to take any action.
-    void call_has_release() {};
-
-    bool has_ready();
-
     int64_t registration_time() const { return _registration_time; }
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
 private:
-    int _filter_id = -1;
-    bool _call_ready {};
-    bool _call_timeout {};
-    std::shared_ptr<RuntimeFilterDependency> _parent;
+    friend struct RuntimeFilterTimerQueue;
+    std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
     std::mutex _lock;
     const int64_t _registration_time;
     const int32_t _wait_time_ms;
-    bool _is_ready = false;
 };
 
 struct RuntimeFilterTimerQueue {
     constexpr static int64_t interval = 10;
     void run() { _thread.detach(); }
-    void start() {
-        while (!_stop) {
-            std::unique_lock<std::mutex> lk(cv_m);
-
-            cv.wait(lk, [this] { return !_que.empty() || _stop; });
-            if (_stop) {
-                break;
-            }
-            {
-                std::unique_lock<std::mutex> lc(_que_lock);
-                std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
-                for (auto& it : _que) {
-                    if (it.use_count() == 1) {
-                        it->call_has_release();
-                    } else if (it->has_ready()) {
-                        it->call_has_ready();
-                    } else {
-                        int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
-                        if (ms_since_registration > it->wait_time_ms()) {
-                            it->call_timeout();
-                        } else {
-                            new_que.push_back(std::move(it));
-                        }
-                    }
-                }
-                new_que.swap(_que);
-            }
-            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
-        }
-        _shutdown = true;
-    }

Review Comment:
   把下面的wait for shutdown的逻辑,移动到stop 里面,stop 应该是同步的,否则容易挂。不要在析构函数里等待。那个时候可能就晚了。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org