You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/01/05 03:05:44 UTC

[incubator-brpc] branch master updated: add timeout concurrency limiter (#2027)

This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5283d0d9 add timeout concurrency limiter (#2027)
5283d0d9 is described below

commit 5283d0d930c33e03346ab59de3580c061c3e90b8
Author: Yang,Liming <li...@139.com>
AuthorDate: Thu Jan 5 11:05:34 2023 +0800

    add timeout concurrency limiter (#2027)
---
 src/brpc/concurrency_limiter.h                     |   3 +-
 src/brpc/details/method_status.h                   |   6 +-
 src/brpc/global.cpp                                |   5 +-
 src/brpc/policy/auto_concurrency_limiter.cpp       |   2 +-
 src/brpc/policy/auto_concurrency_limiter.h         |   2 +-
 src/brpc/policy/baidu_rpc_protocol.cpp             |   2 +-
 src/brpc/policy/constant_concurrency_limiter.cpp   |   2 +-
 src/brpc/policy/constant_concurrency_limiter.h     |   2 +-
 src/brpc/policy/timeout_concurrency_limiter.cpp    | 160 +++++++++++++++++++++
 ...ncy_limiter.h => timeout_concurrency_limiter.h} |  60 +++-----
 test/brpc_timeout_concurrency_limiter_unittest.cpp | 105 ++++++++++++++
 11 files changed, 302 insertions(+), 47 deletions(-)

diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h
index 99ca13a1..083e2cf9 100644
--- a/src/brpc/concurrency_limiter.h
+++ b/src/brpc/concurrency_limiter.h
@@ -22,6 +22,7 @@
 #include "brpc/destroyable.h"
 #include "brpc/extension.h"                       // Extension<T>
 #include "brpc/adaptive_max_concurrency.h"        // AdaptiveMaxConcurrency
+#include "brpc/controller.h"
 
 namespace brpc {
 
@@ -33,7 +34,7 @@ public:
     // false when the concurrency reaches the upper limit, otherwise it 
     // returns true. Normally, when OnRequested returns false, you should 
     // return an ELIMIT error directly.
-    virtual bool OnRequested(int current_concurrency) = 0;
+    virtual bool OnRequested(int current_concurrency, Controller* cntl) = 0;
 
     // Each request should call this method before responding.
     // `error_code' : Error code obtained from the controller, 0 means success.
diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h
index 094e8953..b49b6754 100644
--- a/src/brpc/details/method_status.h
+++ b/src/brpc/details/method_status.h
@@ -38,7 +38,7 @@ public:
     // Call this function when the method is about to be called.
     // Returns false when the method is overloaded. If rejected_cc is not
     // NULL, it's set with the rejected concurrency.
-    bool OnRequested(int* rejected_cc = NULL);
+    bool OnRequested(int* rejected_cc = NULL, Controller* cntl = NULL);
 
     // Call this when the method just finished.
     // `error_code' : The error code obtained from the controller. Equal to 
@@ -89,9 +89,9 @@ private:
     uint64_t _received_us;
 };
 
-inline bool MethodStatus::OnRequested(int* rejected_cc) {
+inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) {
     const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
-    if (NULL == _cl || _cl->OnRequested(cc)) {
+    if (NULL == _cl || _cl->OnRequested(cc, cntl)) {
         return true;
     } 
     if (rejected_cc) {
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index af8dac5c..30c2f1a3 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -80,6 +80,7 @@
 #include "brpc/concurrency_limiter.h"
 #include "brpc/policy/auto_concurrency_limiter.h"
 #include "brpc/policy/constant_concurrency_limiter.h"
+#include "brpc/policy/timeout_concurrency_limiter.h"
 
 #include "brpc/input_messenger.h"     // get_or_new_client_side_messenger
 #include "brpc/socket_map.h"          // SocketMapList
@@ -150,6 +151,7 @@ struct GlobalExtensions {
 
     AutoConcurrencyLimiter auto_cl;
     ConstantConcurrencyLimiter constant_cl;
+    TimeoutConcurrencyLimiter timeout_cl;
 };
 
 static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
@@ -601,7 +603,8 @@ static void GlobalInitializeOrDieImpl() {
     // Concurrency Limiters
     ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
     ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
-    
+    ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl);
+
     if (FLAGS_usercode_in_pthread) {
         // Optional. If channel/server are initialized before main(), this
         // flag may be false at here even if it will be set to true after
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp b/src/brpc/policy/auto_concurrency_limiter.cpp
index 5eafbd7a..d1d52d6d 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -93,7 +93,7 @@ AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency
     return new (std::nothrow) AutoConcurrencyLimiter;
 }
 
-bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
+bool AutoConcurrencyLimiter::OnRequested(int current_concurrency, Controller*) {
     return current_concurrency <= _max_concurrency;
 }
 
diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/auto_concurrency_limiter.h
index 7d694247..6cf5e10c 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/auto_concurrency_limiter.h
@@ -29,7 +29,7 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter {
 public:
     AutoConcurrencyLimiter();
 
-    bool OnRequested(int current_concurrency) override;
+    bool OnRequested(int current_concurrency, Controller*) override;
     
     void OnResponded(int error_code, int64_t latency_us) override;
 
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp
index 76f19619..0239960e 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -453,7 +453,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
         method_status = mp->status;
         if (method_status) {
             int rejected_cc = 0;
-            if (!method_status->OnRequested(&rejected_cc)) {
+            if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
                 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                 mp->method->full_name().c_str(), rejected_cc);
                 break;
diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp b/src/brpc/policy/constant_concurrency_limiter.cpp
index 91ab7a88..be5f071c 100644
--- a/src/brpc/policy/constant_concurrency_limiter.cpp
+++ b/src/brpc/policy/constant_concurrency_limiter.cpp
@@ -24,7 +24,7 @@ ConstantConcurrencyLimiter::ConstantConcurrencyLimiter(int max_concurrency)
     : _max_concurrency(max_concurrency) {
 }
 
-bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency) {
+bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency, Controller*) {
     return current_concurrency <= _max_concurrency;
 }
 
diff --git a/src/brpc/policy/constant_concurrency_limiter.h b/src/brpc/policy/constant_concurrency_limiter.h
index 755714b8..f58a6286 100644
--- a/src/brpc/policy/constant_concurrency_limiter.h
+++ b/src/brpc/policy/constant_concurrency_limiter.h
@@ -27,7 +27,7 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
 public:
     explicit ConstantConcurrencyLimiter(int max_concurrency);
     
-    bool OnRequested(int current_concurrency) override;
+    bool OnRequested(int current_concurrency, Controller*) override;
     
     void OnResponded(int error_code, int64_t latency_us) override;
 
diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp b/src/brpc/policy/timeout_concurrency_limiter.cpp
new file mode 100644
index 00000000..d35f1c03
--- /dev/null
+++ b/src/brpc/policy/timeout_concurrency_limiter.cpp
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/timeout_concurrency_limiter.h"
+#include "brpc/controller.h"
+#include "brpc/errno.pb.h"
+#include <cmath>
+#include <gflags/gflags.h>
+
+namespace brpc {
+namespace policy {
+
+DEFINE_int32(timeout_cl_sample_window_size_ms, 1000,
+             "Duration of the sampling window.");
+DEFINE_int32(timeout_cl_min_sample_count, 100,
+             "During the duration of the sampling window, if the number of "
+             "requests collected is less than this value, the sampling window "
+             "will be discarded.");
+DEFINE_int32(timeout_cl_max_sample_count, 200,
+             "During the duration of the sampling window, once the number of "
+             "requests collected is greater than this value, even if the "
+             "duration of the window has not ended, the max_concurrency will "
+             "be updated and a new sampling window will be started.");
+DEFINE_double(timeout_cl_sampling_interval_ms, 0.1,
+              "Interval for sampling request in auto concurrency limiter");
+DEFINE_int32(timeout_cl_initial_avg_latency_us, 500,
+             "Initial max concurrency for gradient concurrency limiter");
+DEFINE_bool(
+    timeout_cl_enable_error_punish, true,
+    "Whether to consider failed requests when calculating maximum concurrency");
+DEFINE_double(
+    timeout_cl_fail_punish_ratio, 1.0,
+    "Use the failed requests to punish normal requests. The larger "
+    "the configuration item, the more aggressive the penalty strategy.");
+DEFINE_int32(timeout_cl_default_timeout_ms, 500,
+             "Default timeout for rpc request");
+
+TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
+    : _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
+      _last_sampling_time_us(0) {}
+
+TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
+    const AdaptiveMaxConcurrency &) const {
+    return new (std::nothrow) TimeoutConcurrencyLimiter;
+}
+
+bool TimeoutConcurrencyLimiter::OnRequested(int, Controller *cntl) {
+    auto timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
+    if (cntl != nullptr && cntl->timeout_ms() != UNSET_MAGIC_NUM) {
+        timeout_ms = cntl->timeout_ms();
+    }
+    return _avg_latency_us < timeout_ms * 1000;
+}
+
+void TimeoutConcurrencyLimiter::OnResponded(int error_code,
+                                            int64_t latency_us) {
+    if (ELIMIT == error_code) {
+        return;
+    }
+
+    const int64_t now_time_us = butil::gettimeofday_us();
+    int64_t last_sampling_time_us =
+        _last_sampling_time_us.load(butil::memory_order_relaxed);
+
+    if (last_sampling_time_us == 0 ||
+        now_time_us - last_sampling_time_us >=
+            FLAGS_timeout_cl_sampling_interval_ms * 1000) {
+        bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
+            last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
+        if (sample_this_call) {
+            bool sample_window_submitted =
+                AddSample(error_code, latency_us, now_time_us);
+            if (sample_window_submitted) {
+                // The following log prints has data-race in extreme cases,
+                // unless you are in debug, you should not open it.
+                VLOG(1) << "Sample window submitted, current avg_latency_us:"
+                        << _avg_latency_us;
+            }
+        }
+    }
+}
+
+int TimeoutConcurrencyLimiter::MaxConcurrency() { return 0; }
+
+bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
+                                          int64_t sampling_time_us) {
+    std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
+    if (_sw.start_time_us == 0) {
+        _sw.start_time_us = sampling_time_us;
+    }
+
+    if (error_code != 0 && FLAGS_timeout_cl_enable_error_punish) {
+        ++_sw.failed_count;
+        _sw.total_failed_us += latency_us;
+    } else if (error_code == 0) {
+        ++_sw.succ_count;
+        _sw.total_succ_us += latency_us;
+    }
+
+    if (_sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_min_sample_count) {
+        if (sampling_time_us - _sw.start_time_us >=
+            FLAGS_timeout_cl_sample_window_size_ms * 1000) {
+            // If the sample size is insufficient at the end of the sampling
+            // window, discard the entire sampling window
+            ResetSampleWindow(sampling_time_us);
+        }
+        return false;
+    }
+    if (sampling_time_us - _sw.start_time_us <
+            FLAGS_timeout_cl_sample_window_size_ms * 1000 &&
+        _sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_max_sample_count) {
+        return false;
+    }
+
+    if (_sw.succ_count > 0) {
+        UpdateAvgLatency();
+    } else {
+        // All request failed
+        AdjustAvgLatency(_avg_latency_us / 2);
+    }
+    ResetSampleWindow(sampling_time_us);
+    return true;
+}
+
+void TimeoutConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
+    _sw.start_time_us = sampling_time_us;
+    _sw.succ_count = 0;
+    _sw.failed_count = 0;
+    _sw.total_failed_us = 0;
+    _sw.total_succ_us = 0;
+}
+
+void TimeoutConcurrencyLimiter::AdjustAvgLatency(int64_t avg_latency_us) {
+    _avg_latency_us = avg_latency_us;
+}
+
+void TimeoutConcurrencyLimiter::UpdateAvgLatency() {
+    double failed_punish =
+        _sw.total_failed_us * FLAGS_timeout_cl_fail_punish_ratio;
+    auto avg_latency_us =
+        std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
+    AdjustAvgLatency(avg_latency_us);
+}
+
+}  // namespace policy
+}  // namespace brpc
diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/timeout_concurrency_limiter.h
similarity index 55%
copy from src/brpc/policy/auto_concurrency_limiter.h
copy to src/brpc/policy/timeout_concurrency_limiter.h
index 7d694247..716df577 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/timeout_concurrency_limiter.h
@@ -15,36 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
-#define BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
+#ifndef BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
+#define BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
 
-#include "bvar/bvar.h"
-#include "butil/containers/bounded_queue.h"
 #include "brpc/concurrency_limiter.h"
 
 namespace brpc {
 namespace policy {
 
-class AutoConcurrencyLimiter : public ConcurrencyLimiter {
-public:
-    AutoConcurrencyLimiter();
+class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
+   public:
+    TimeoutConcurrencyLimiter();
+
+    bool OnRequested(int, Controller* cntl) override;
 
-    bool OnRequested(int current_concurrency) override;
-    
     void OnResponded(int error_code, int64_t latency_us) override;
 
     int MaxConcurrency() override;
 
-    AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
+    TimeoutConcurrencyLimiter* New(
+        const AdaptiveMaxConcurrency&) const override;
 
-private:
+   private:
     struct SampleWindow {
-        SampleWindow() 
-            : start_time_us(0)
-            , succ_count(0)
-            , failed_count(0)
-            , total_failed_us(0)
-            , total_succ_us(0) {}
+        SampleWindow()
+            : start_time_us(0),
+              succ_count(0),
+              failed_count(0),
+              total_failed_us(0),
+              total_succ_us(0) {}
         int64_t start_time_us;
         int32_t succ_count;
         int32_t failed_count;
@@ -52,37 +51,24 @@ private:
         int64_t total_succ_us;
     };
 
-    bool AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
-    int64_t NextResetTime(int64_t sampling_time_us);
+    bool AddSample(int error_code, int64_t latency_us,
+                   int64_t sampling_time_us);
 
-    // The following methods are not thread safe and can only be called 
+    // The following methods are not thread safe and can only be called
     // in AppSample()
-    void UpdateMaxConcurrency(int64_t sampling_time_us);
     void ResetSampleWindow(int64_t sampling_time_us);
-    void UpdateMinLatency(int64_t latency_us);
-    void UpdateQps(double qps);
-
-    void AdjustMaxConcurrency(int next_max_concurrency);
+    void UpdateAvgLatency();
+    void AdjustAvgLatency(int64_t avg_latency_us);
 
     // modified per sample-window or more
-    int _max_concurrency;
-    int64_t _remeasure_start_us;
-    int64_t _reset_latency_us;
-    int64_t _min_latency_us; 
-    double _ema_max_qps;
-    double _explore_ratio;
-  
+    int64_t _avg_latency_us;
     // modified per sample.
     BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
     butil::Mutex _sw_mutex;
     SampleWindow _sw;
-
-    // modified per request.
-    BAIDU_CACHELINE_ALIGNMENT butil::atomic<int32_t> _total_succ_req;
 };
 
 }  // namespace policy
 }  // namespace brpc
 
-
-#endif // BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
+#endif  // BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
diff --git a/test/brpc_timeout_concurrency_limiter_unittest.cpp b/test/brpc_timeout_concurrency_limiter_unittest.cpp
new file mode 100644
index 00000000..c80f6921
--- /dev/null
+++ b/test/brpc_timeout_concurrency_limiter_unittest.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/timeout_concurrency_limiter.h"
+#include "butil/time.h"
+#include "bthread/bthread.h"
+#include <gtest/gtest.h>
+
+namespace brpc {
+namespace policy {
+DECLARE_int32(timeout_cl_sample_window_size_ms);
+DECLARE_int32(timeout_cl_min_sample_count);
+DECLARE_int32(timeout_cl_max_sample_count);
+}  // namespace policy
+}  // namespace brpc
+
+TEST(TimeoutConcurrencyLimiterTest, AddSample) {
+    {
+        brpc::policy::FLAGS_timeout_cl_sample_window_size_ms = 10;
+        brpc::policy::FLAGS_timeout_cl_min_sample_count = 5;
+        brpc::policy::FLAGS_timeout_cl_max_sample_count = 10;
+
+        brpc::policy::TimeoutConcurrencyLimiter limiter;
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        bthread_usleep(10 * 1000);
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        ASSERT_EQ(limiter._sw.succ_count, 0);
+        ASSERT_EQ(limiter._sw.failed_count, 0);
+
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        bthread_usleep(10 * 1000);
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        ASSERT_EQ(limiter._sw.succ_count, 0);
+        ASSERT_EQ(limiter._sw.failed_count, 0);
+        ASSERT_EQ(limiter._avg_latency_us, 50);
+
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        ASSERT_EQ(limiter._sw.succ_count, 0);
+        ASSERT_EQ(limiter._sw.failed_count, 0);
+        ASSERT_EQ(limiter._avg_latency_us, 50);
+
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        ASSERT_EQ(limiter._sw.succ_count, 6);
+        ASSERT_EQ(limiter._sw.failed_count, 0);
+
+        limiter.ResetSampleWindow(butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(0, 50, butil::gettimeofday_us());
+        limiter.AddSample(1, 50, butil::gettimeofday_us());
+        limiter.AddSample(1, 50, butil::gettimeofday_us());
+        limiter.AddSample(1, 50, butil::gettimeofday_us());
+        ASSERT_EQ(limiter._sw.succ_count, 3);
+        ASSERT_EQ(limiter._sw.failed_count, 3);
+    }
+}
+
+TEST(TimeoutConcurrencyLimiterTest, OnResponded) {
+    brpc::policy::FLAGS_timeout_cl_sample_window_size_ms = 10;
+    brpc::policy::FLAGS_timeout_cl_min_sample_count = 5;
+    brpc::policy::FLAGS_timeout_cl_max_sample_count = 10;
+    brpc::policy::TimeoutConcurrencyLimiter limiter;
+    limiter.OnResponded(0, 50);
+    limiter.OnResponded(0, 50);
+    bthread_usleep(100);
+    limiter.OnResponded(0, 50);
+    limiter.OnResponded(1, 50);
+    ASSERT_EQ(limiter._sw.succ_count, 2);
+    ASSERT_EQ(limiter._sw.failed_count, 0);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org