You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/13 11:03:10 UTC

[GitHub] [doris] Gabriel39 opened a new pull request, #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Gabriel39 opened a new pull request, #15040:
URL: https://github.com/apache/doris/pull/15040

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem summary
   
   Describe your changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [ ] I don't know
   2. Has unit tests been added:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   3. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   4. Does it need to update dependencies:
       - [ ] Yes
       - [ ] No
   5. Are there any changes that cannot be rolled back:
       - [ ] Yes (If Yes, please explain WHY)
       - [ ] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] Gabriel39 commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050778258


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec() &&
+        _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::NOT_READY) {
+        auto expected = RuntimeFilterState::NOT_READY;
+        if (!_rf_state_atomic.compare_exchange_strong(
+                    expected,
+                    MonotonicMillis() - registration_time_ >= wait_times_ms
+                            ? RuntimeFilterState::TIME_OUT
+                            : RuntimeFilterState::NOT_READY,
+                    std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY);
+            return true;
+        }
+        return false;
+    } else if (_state->enable_pipeline_exec() &&
+               _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::TIME_OUT) {
+        return false;
+    } else if (!_state->enable_pipeline_exec()) {
+        SCOPED_TIMER(_await_time_cost);
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        if (_rf_state != RuntimeFilterState::READY) {
+            int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+            int64_t ms_remaining = wait_times_ms - ms_since_registration;
+            _rf_state = RuntimeFilterState::TIME_OUT;
+            if (ms_remaining <= 0) {
+                return false;
+            }
+            return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
+                                      [this] { return _rf_state == RuntimeFilterState::READY; });
         }
-        return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
-                                  [this] { return this->_is_ready; });
     }
     return true;
 }
 
+bool IRuntimeFilter::is_ready_or_timeout() {
+    DCHECK(is_consumer());
+    auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? _state->query_options().query_timeout
+                                    : _state->runtime_filter_wait_time_ms();
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    if (!_state->enable_pipeline_exec()) {
+        _rf_state = RuntimeFilterState::TIME_OUT;
+        return true;
+    } else if (is_ready()) {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+        }
+        return true;
+    } else {
+        if (cur_state == RuntimeFilterState::NOT_READY) {

Review Comment:
   I want to make sure that only add profile once its state is not NOT_READY. Do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1350597822

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1348212859

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] Gabriel39 merged pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 merged PR #15040:
URL: https://github.com/apache/doris/pull/15040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1356223927

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] Gabriel39 commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050781731


##########
be/src/vec/exec/scan/vscan_node.cpp:
##########
@@ -84,13 +84,22 @@ Status VScanNode::prepare(RuntimeState* state) {
 
 Status VScanNode::open(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
-    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(ExecNode::open(state));
+    return ExecNode::open(state);
+}
 
+Status VScanNode::alloc_resource(RuntimeState* state) {
+    if (_opened) {
+        return Status::OK();
+    }
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
+    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::alloc_resource");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     RETURN_IF_ERROR(_acquire_runtime_filter());

Review Comment:
   This argument is true by default. I think it will be false for some cases (maybe external table? )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1356771363

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1348416958

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050706005


##########
be/src/exec/exec_node.h:
##########
@@ -96,6 +96,8 @@ class ExecNode {
     // so only vectorized exec node need to impl
     virtual Status alloc_resource(RuntimeState* state);
 
+    virtual bool runtime_filters_are_ready_or_timeout() { return true; }

Review Comment:
   better virtual function in `new_scan_node` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050765618


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -205,12 +212,12 @@ void PipelineTask::set_state(PipelineTaskState state) {
 
 std::string PipelineTask::debug_string() const {
     std::stringstream ss;
-    ss << "PipelineTask(" << _index << ")" << get_state_name(_cur_state) << "\nsink: ";
-    ss << _sink->debug_string();
-    ss << "\n operators(from source to root)";
-    for (auto operatr : _operators) {
-        ss << "\n" << operatr->debug_string();
+    ss << "PipelineTask[id = " << _index << ", state = " << get_state_name(_cur_state)

Review Comment:
   better use `fmt::format` to replace `stringstream`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1351304999

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050768557


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -86,14 +76,21 @@ bool PipelineTask::has_dependency() {
 }
 
 Status PipelineTask::open() {
-    if (_sink) {
-        RETURN_IF_ERROR(_sink->open(_state));
-    }
+    auto st = Status::OK();
     for (auto& o : _operators) {
-        RETURN_IF_ERROR(o->open(_state));
+        if (st.is_blocked_by_rf()) {
+            o->open(_state);

Review Comment:
   should `RETURN_IF_ERROR`, if some opeartor open failed, should as failed as quickly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1356771376

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050746331


##########
be/src/pipeline/exec/scan_operator.h:
##########
@@ -42,9 +42,7 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
 
     bool is_pending_finish() const override;
 
-    Status open(RuntimeState* state) override;
-
-    Status close(RuntimeState* state) override;

Review Comment:
   why delete the code, other scan_node, like odbc, jdbc may need the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050746331


##########
be/src/pipeline/exec/scan_operator.h:
##########
@@ -42,9 +42,7 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
 
     bool is_pending_finish() const override;
 
-    Status open(RuntimeState* state) override;
-
-    Status close(RuntimeState* state) override;

Review Comment:
   why delete the code, other scan_node, like odbc, jdbc may need the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050768557


##########
be/src/pipeline/pipeline_task.cpp:
##########
@@ -86,14 +76,21 @@ bool PipelineTask::has_dependency() {
 }
 
 Status PipelineTask::open() {
-    if (_sink) {
-        RETURN_IF_ERROR(_sink->open(_state));
-    }
+    auto st = Status::OK();
     for (auto& o : _operators) {
-        RETURN_IF_ERROR(o->open(_state));
+        if (st.is_blocked_by_rf()) {
+            o->open(_state);

Review Comment:
   should RETURN_IF_ERROR, if some opeartor open failed, should as failed as quickly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1354922177

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] hello-stephen commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
hello-stephen commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1348607626

   TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 34.98 seconds
    load time: 439 seconds
    storage size: 17123356790 Bytes
    https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/tmp/20221213135223_clickbench_pr_62752.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15040:
URL: https://github.com/apache/doris/pull/15040#issuecomment-1348223387

   clang-tidy review says "All clean, LGTM! :+1:"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050715741


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec() &&
+        _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::NOT_READY) {
+        auto expected = RuntimeFilterState::NOT_READY;
+        if (!_rf_state_atomic.compare_exchange_strong(
+                    expected,
+                    MonotonicMillis() - registration_time_ >= wait_times_ms
+                            ? RuntimeFilterState::TIME_OUT
+                            : RuntimeFilterState::NOT_READY,
+                    std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY);
+            return true;
+        }
+        return false;
+    } else if (_state->enable_pipeline_exec() &&
+               _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::TIME_OUT) {
+        return false;
+    } else if (!_state->enable_pipeline_exec()) {
+        SCOPED_TIMER(_await_time_cost);
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        if (_rf_state != RuntimeFilterState::READY) {
+            int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+            int64_t ms_remaining = wait_times_ms - ms_since_registration;
+            _rf_state = RuntimeFilterState::TIME_OUT;
+            if (ms_remaining <= 0) {
+                return false;
+            }
+            return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
+                                      [this] { return _rf_state == RuntimeFilterState::READY; });
         }
-        return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
-                                  [this] { return this->_is_ready; });
     }
     return true;
 }
 
+bool IRuntimeFilter::is_ready_or_timeout() {
+    DCHECK(is_consumer());
+    auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? _state->query_options().query_timeout
+                                    : _state->runtime_filter_wait_time_ms();
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    if (!_state->enable_pipeline_exec()) {
+        _rf_state = RuntimeFilterState::TIME_OUT;
+        return true;
+    } else if (is_ready()) {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+        }
+        return true;
+    } else {
+        if (cur_state == RuntimeFilterState::NOT_READY) {

Review Comment:
   the logic here is wired? rethink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050708696


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec() &&

Review Comment:
   better code struct:
   ```
   if (_state->enable_pipeline_exec()) {
      if (_rf_state_atomic.load() == xxx) { 
      } 
      if ()
   } else {
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050764426


##########
be/src/vec/exec/scan/vscan_node.cpp:
##########
@@ -84,13 +84,22 @@ Status VScanNode::prepare(RuntimeState* state) {
 
 Status VScanNode::open(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
-    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
-    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(ExecNode::open(state));
+    return ExecNode::open(state);
+}
 
+Status VScanNode::alloc_resource(RuntimeState* state) {
+    if (_opened) {
+        return Status::OK();
+    }
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+    _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
+    _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::alloc_resource");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     RETURN_IF_ERROR(_acquire_runtime_filter());

Review Comment:
   the function seems need to pass value `wait`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org