You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/16 13:49:04 UTC

[GitHub] [doris] Gabriel39 commented on a diff in pull request #15040: [Pipeline](runtime filter) Support runtime filters on pipeline engine

Gabriel39 commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050778258


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec() &&
+        _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::NOT_READY) {
+        auto expected = RuntimeFilterState::NOT_READY;
+        if (!_rf_state_atomic.compare_exchange_strong(
+                    expected,
+                    MonotonicMillis() - registration_time_ >= wait_times_ms
+                            ? RuntimeFilterState::TIME_OUT
+                            : RuntimeFilterState::NOT_READY,
+                    std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY);
+            return true;
+        }
+        return false;
+    } else if (_state->enable_pipeline_exec() &&
+               _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::TIME_OUT) {
+        return false;
+    } else if (!_state->enable_pipeline_exec()) {
+        SCOPED_TIMER(_await_time_cost);
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        if (_rf_state != RuntimeFilterState::READY) {
+            int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+            int64_t ms_remaining = wait_times_ms - ms_since_registration;
+            _rf_state = RuntimeFilterState::TIME_OUT;
+            if (ms_remaining <= 0) {
+                return false;
+            }
+            return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
+                                      [this] { return _rf_state == RuntimeFilterState::READY; });
         }
-        return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining),
-                                  [this] { return this->_is_ready; });
     }
     return true;
 }
 
+bool IRuntimeFilter::is_ready_or_timeout() {
+    DCHECK(is_consumer());
+    auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    // bitmap filter is precise filter and only filter once, so it must be applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
+                                    ? _state->query_options().query_timeout
+                                    : _state->runtime_filter_wait_time_ms();
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    if (!_state->enable_pipeline_exec()) {
+        _rf_state = RuntimeFilterState::TIME_OUT;
+        return true;
+    } else if (is_ready()) {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms");
+        }
+        return true;
+    } else {
+        if (cur_state == RuntimeFilterState::NOT_READY) {

Review Comment:
   I want to make sure that only add profile once its state is not NOT_READY. Do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org