You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2023/01/05 03:05:44 UTC
[incubator-brpc] branch master updated: add timeout concurrency limiter (#2027)
This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 5283d0d9 add timeout concurrency limiter (#2027)
5283d0d9 is described below
commit 5283d0d930c33e03346ab59de3580c061c3e90b8
Author: Yang,Liming <li...@139.com>
AuthorDate: Thu Jan 5 11:05:34 2023 +0800
add timeout concurrency limiter (#2027)
---
src/brpc/concurrency_limiter.h | 3 +-
src/brpc/details/method_status.h | 6 +-
src/brpc/global.cpp | 5 +-
src/brpc/policy/auto_concurrency_limiter.cpp | 2 +-
src/brpc/policy/auto_concurrency_limiter.h | 2 +-
src/brpc/policy/baidu_rpc_protocol.cpp | 2 +-
src/brpc/policy/constant_concurrency_limiter.cpp | 2 +-
src/brpc/policy/constant_concurrency_limiter.h | 2 +-
src/brpc/policy/timeout_concurrency_limiter.cpp | 160 +++++++++++++++++++++
...ncy_limiter.h => timeout_concurrency_limiter.h} | 60 +++-----
test/brpc_timeout_concurrency_limiter_unittest.cpp | 105 ++++++++++++++
11 files changed, 302 insertions(+), 47 deletions(-)
diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h
index 99ca13a1..083e2cf9 100644
--- a/src/brpc/concurrency_limiter.h
+++ b/src/brpc/concurrency_limiter.h
@@ -22,6 +22,7 @@
#include "brpc/destroyable.h"
#include "brpc/extension.h" // Extension<T>
#include "brpc/adaptive_max_concurrency.h" // AdaptiveMaxConcurrency
+#include "brpc/controller.h"
namespace brpc {
@@ -33,7 +34,7 @@ public:
// false when the concurrency reaches the upper limit, otherwise it
// returns true. Normally, when OnRequested returns false, you should
// return an ELIMIT error directly.
- virtual bool OnRequested(int current_concurrency) = 0;
+ virtual bool OnRequested(int current_concurrency, Controller* cntl) = 0;
// Each request should call this method before responding.
// `error_code' : Error code obtained from the controller, 0 means success.
diff --git a/src/brpc/details/method_status.h b/src/brpc/details/method_status.h
index 094e8953..b49b6754 100644
--- a/src/brpc/details/method_status.h
+++ b/src/brpc/details/method_status.h
@@ -38,7 +38,7 @@ public:
// Call this function when the method is about to be called.
// Returns false when the method is overloaded. If rejected_cc is not
// NULL, it's set with the rejected concurrency.
- bool OnRequested(int* rejected_cc = NULL);
+ bool OnRequested(int* rejected_cc = NULL, Controller* cntl = NULL);
// Call this when the method just finished.
// `error_code' : The error code obtained from the controller. Equal to
@@ -89,9 +89,9 @@ private:
uint64_t _received_us;
};
-inline bool MethodStatus::OnRequested(int* rejected_cc) {
+inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) {
const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
- if (NULL == _cl || _cl->OnRequested(cc)) {
+ if (NULL == _cl || _cl->OnRequested(cc, cntl)) {
return true;
}
if (rejected_cc) {
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index af8dac5c..30c2f1a3 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -80,6 +80,7 @@
#include "brpc/concurrency_limiter.h"
#include "brpc/policy/auto_concurrency_limiter.h"
#include "brpc/policy/constant_concurrency_limiter.h"
+#include "brpc/policy/timeout_concurrency_limiter.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList
@@ -150,6 +151,7 @@ struct GlobalExtensions {
AutoConcurrencyLimiter auto_cl;
ConstantConcurrencyLimiter constant_cl;
+ TimeoutConcurrencyLimiter timeout_cl;
};
static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
@@ -601,7 +603,8 @@ static void GlobalInitializeOrDieImpl() {
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
-
+ ConcurrencyLimiterExtension()->RegisterOrDie("timeout", &g_ext->timeout_cl);
+
if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp b/src/brpc/policy/auto_concurrency_limiter.cpp
index 5eafbd7a..d1d52d6d 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -93,7 +93,7 @@ AutoConcurrencyLimiter* AutoConcurrencyLimiter::New(const AdaptiveMaxConcurrency
return new (std::nothrow) AutoConcurrencyLimiter;
}
-bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
+bool AutoConcurrencyLimiter::OnRequested(int current_concurrency, Controller*) {
return current_concurrency <= _max_concurrency;
}
diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/auto_concurrency_limiter.h
index 7d694247..6cf5e10c 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/auto_concurrency_limiter.h
@@ -29,7 +29,7 @@ class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public:
AutoConcurrencyLimiter();
- bool OnRequested(int current_concurrency) override;
+ bool OnRequested(int current_concurrency, Controller*) override;
void OnResponded(int error_code, int64_t latency_us) override;
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp
index 76f19619..0239960e 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -453,7 +453,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
method_status = mp->status;
if (method_status) {
int rejected_cc = 0;
- if (!method_status->OnRequested(&rejected_cc)) {
+ if (!method_status->OnRequested(&rejected_cc, cntl.get())) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp b/src/brpc/policy/constant_concurrency_limiter.cpp
index 91ab7a88..be5f071c 100644
--- a/src/brpc/policy/constant_concurrency_limiter.cpp
+++ b/src/brpc/policy/constant_concurrency_limiter.cpp
@@ -24,7 +24,7 @@ ConstantConcurrencyLimiter::ConstantConcurrencyLimiter(int max_concurrency)
: _max_concurrency(max_concurrency) {
}
-bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency) {
+bool ConstantConcurrencyLimiter::OnRequested(int current_concurrency, Controller*) {
return current_concurrency <= _max_concurrency;
}
diff --git a/src/brpc/policy/constant_concurrency_limiter.h b/src/brpc/policy/constant_concurrency_limiter.h
index 755714b8..f58a6286 100644
--- a/src/brpc/policy/constant_concurrency_limiter.h
+++ b/src/brpc/policy/constant_concurrency_limiter.h
@@ -27,7 +27,7 @@ class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
explicit ConstantConcurrencyLimiter(int max_concurrency);
- bool OnRequested(int current_concurrency) override;
+ bool OnRequested(int current_concurrency, Controller*) override;
void OnResponded(int error_code, int64_t latency_us) override;
diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp b/src/brpc/policy/timeout_concurrency_limiter.cpp
new file mode 100644
index 00000000..d35f1c03
--- /dev/null
+++ b/src/brpc/policy/timeout_concurrency_limiter.cpp
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/timeout_concurrency_limiter.h"
+#include "brpc/controller.h"
+#include "brpc/errno.pb.h"
+#include <cmath>
+#include <gflags/gflags.h>
+
+namespace brpc {
+namespace policy {
+
+DEFINE_int32(timeout_cl_sample_window_size_ms, 1000,
+ "Duration of the sampling window.");
+DEFINE_int32(timeout_cl_min_sample_count, 100,
+ "During the duration of the sampling window, if the number of "
+ "requests collected is less than this value, the sampling window "
+ "will be discarded.");
+DEFINE_int32(timeout_cl_max_sample_count, 200,
+ "During the duration of the sampling window, once the number of "
+ "requests collected is greater than this value, even if the "
+ "duration of the window has not ended, the max_concurrency will "
+ "be updated and a new sampling window will be started.");
+DEFINE_double(timeout_cl_sampling_interval_ms, 0.1,
+ "Interval for sampling request in auto concurrency limiter");
+DEFINE_int32(timeout_cl_initial_avg_latency_us, 500,
+ "Initial max concurrency for gradient concurrency limiter");
+DEFINE_bool(
+ timeout_cl_enable_error_punish, true,
+ "Whether to consider failed requests when calculating maximum concurrency");
+DEFINE_double(
+ timeout_cl_fail_punish_ratio, 1.0,
+ "Use the failed requests to punish normal requests. The larger "
+ "the configuration item, the more aggressive the penalty strategy.");
+DEFINE_int32(timeout_cl_default_timeout_ms, 500,
+ "Default timeout for rpc request");
+
+TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
+ : _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
+ _last_sampling_time_us(0) {}
+
+TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
+ const AdaptiveMaxConcurrency &) const {
+ return new (std::nothrow) TimeoutConcurrencyLimiter;
+}
+
+bool TimeoutConcurrencyLimiter::OnRequested(int, Controller *cntl) {
+ auto timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
+ if (cntl != nullptr && cntl->timeout_ms() != UNSET_MAGIC_NUM) {
+ timeout_ms = cntl->timeout_ms();
+ }
+ return _avg_latency_us < timeout_ms * 1000;
+}
+
+void TimeoutConcurrencyLimiter::OnResponded(int error_code,
+ int64_t latency_us) {
+ if (ELIMIT == error_code) {
+ return;
+ }
+
+ const int64_t now_time_us = butil::gettimeofday_us();
+ int64_t last_sampling_time_us =
+ _last_sampling_time_us.load(butil::memory_order_relaxed);
+
+ if (last_sampling_time_us == 0 ||
+ now_time_us - last_sampling_time_us >=
+ FLAGS_timeout_cl_sampling_interval_ms * 1000) {
+ bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
+ last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
+ if (sample_this_call) {
+ bool sample_window_submitted =
+ AddSample(error_code, latency_us, now_time_us);
+ if (sample_window_submitted) {
+ // The following log prints has data-race in extreme cases,
+ // unless you are in debug, you should not open it.
+ VLOG(1) << "Sample window submitted, current avg_latency_us:"
+ << _avg_latency_us;
+ }
+ }
+ }
+}
+
+int TimeoutConcurrencyLimiter::MaxConcurrency() { return 0; }
+
+bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
+ int64_t sampling_time_us) {
+ std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
+ if (_sw.start_time_us == 0) {
+ _sw.start_time_us = sampling_time_us;
+ }
+
+ if (error_code != 0 && FLAGS_timeout_cl_enable_error_punish) {
+ ++_sw.failed_count;
+ _sw.total_failed_us += latency_us;
+ } else if (error_code == 0) {
+ ++_sw.succ_count;
+ _sw.total_succ_us += latency_us;
+ }
+
+ if (_sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_min_sample_count) {
+ if (sampling_time_us - _sw.start_time_us >=
+ FLAGS_timeout_cl_sample_window_size_ms * 1000) {
+ // If the sample size is insufficient at the end of the sampling
+ // window, discard the entire sampling window
+ ResetSampleWindow(sampling_time_us);
+ }
+ return false;
+ }
+ if (sampling_time_us - _sw.start_time_us <
+ FLAGS_timeout_cl_sample_window_size_ms * 1000 &&
+ _sw.succ_count + _sw.failed_count < FLAGS_timeout_cl_max_sample_count) {
+ return false;
+ }
+
+ if (_sw.succ_count > 0) {
+ UpdateAvgLatency();
+ } else {
+ // All request failed
+ AdjustAvgLatency(_avg_latency_us / 2);
+ }
+ ResetSampleWindow(sampling_time_us);
+ return true;
+}
+
+void TimeoutConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
+ _sw.start_time_us = sampling_time_us;
+ _sw.succ_count = 0;
+ _sw.failed_count = 0;
+ _sw.total_failed_us = 0;
+ _sw.total_succ_us = 0;
+}
+
+void TimeoutConcurrencyLimiter::AdjustAvgLatency(int64_t avg_latency_us) {
+ _avg_latency_us = avg_latency_us;
+}
+
+void TimeoutConcurrencyLimiter::UpdateAvgLatency() {
+ double failed_punish =
+ _sw.total_failed_us * FLAGS_timeout_cl_fail_punish_ratio;
+ auto avg_latency_us =
+ std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
+ AdjustAvgLatency(avg_latency_us);
+}
+
+} // namespace policy
+} // namespace brpc
diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/timeout_concurrency_limiter.h
similarity index 55%
copy from src/brpc/policy/auto_concurrency_limiter.h
copy to src/brpc/policy/timeout_concurrency_limiter.h
index 7d694247..716df577 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/timeout_concurrency_limiter.h
@@ -15,36 +15,35 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
-#define BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
+#ifndef BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
+#define BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
-#include "bvar/bvar.h"
-#include "butil/containers/bounded_queue.h"
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
-class AutoConcurrencyLimiter : public ConcurrencyLimiter {
-public:
- AutoConcurrencyLimiter();
+class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
+ public:
+ TimeoutConcurrencyLimiter();
+
+ bool OnRequested(int, Controller* cntl) override;
- bool OnRequested(int current_concurrency) override;
-
void OnResponded(int error_code, int64_t latency_us) override;
int MaxConcurrency() override;
- AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
+ TimeoutConcurrencyLimiter* New(
+ const AdaptiveMaxConcurrency&) const override;
-private:
+ private:
struct SampleWindow {
- SampleWindow()
- : start_time_us(0)
- , succ_count(0)
- , failed_count(0)
- , total_failed_us(0)
- , total_succ_us(0) {}
+ SampleWindow()
+ : start_time_us(0),
+ succ_count(0),
+ failed_count(0),
+ total_failed_us(0),
+ total_succ_us(0) {}
int64_t start_time_us;
int32_t succ_count;
int32_t failed_count;
@@ -52,37 +51,24 @@ private:
int64_t total_succ_us;
};
- bool AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
- int64_t NextResetTime(int64_t sampling_time_us);
+ bool AddSample(int error_code, int64_t latency_us,
+ int64_t sampling_time_us);
- // The following methods are not thread safe and can only be called
+ // The following methods are not thread safe and can only be called
// in AppSample()
- void UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
- void UpdateMinLatency(int64_t latency_us);
- void UpdateQps(double qps);
-
- void AdjustMaxConcurrency(int next_max_concurrency);
+ void UpdateAvgLatency();
+ void AdjustAvgLatency(int64_t avg_latency_us);
// modified per sample-window or more
- int _max_concurrency;
- int64_t _remeasure_start_us;
- int64_t _reset_latency_us;
- int64_t _min_latency_us;
- double _ema_max_qps;
- double _explore_ratio;
-
+ int64_t _avg_latency_us;
// modified per sample.
BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
butil::Mutex _sw_mutex;
SampleWindow _sw;
-
- // modified per request.
- BAIDU_CACHELINE_ALIGNMENT butil::atomic<int32_t> _total_succ_req;
};
} // namespace policy
} // namespace brpc
-
-#endif // BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
+#endif // BRPC_POLICY_TIMEOUT_CONCURRENCY_LIMITER_H
diff --git a/test/brpc_timeout_concurrency_limiter_unittest.cpp b/test/brpc_timeout_concurrency_limiter_unittest.cpp
new file mode 100644
index 00000000..c80f6921
--- /dev/null
+++ b/test/brpc_timeout_concurrency_limiter_unittest.cpp
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/policy/timeout_concurrency_limiter.h"
+#include "butil/time.h"
+#include "bthread/bthread.h"
+#include <gtest/gtest.h>
+
+namespace brpc {
+namespace policy {
+DECLARE_int32(timeout_cl_sample_window_size_ms);
+DECLARE_int32(timeout_cl_min_sample_count);
+DECLARE_int32(timeout_cl_max_sample_count);
+} // namespace policy
+} // namespace brpc
+
+TEST(TimeoutConcurrencyLimiterTest, AddSample) {
+ {
+ brpc::policy::FLAGS_timeout_cl_sample_window_size_ms = 10;
+ brpc::policy::FLAGS_timeout_cl_min_sample_count = 5;
+ brpc::policy::FLAGS_timeout_cl_max_sample_count = 10;
+
+ brpc::policy::TimeoutConcurrencyLimiter limiter;
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ bthread_usleep(10 * 1000);
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ ASSERT_EQ(limiter._sw.succ_count, 0);
+ ASSERT_EQ(limiter._sw.failed_count, 0);
+
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ bthread_usleep(10 * 1000);
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ ASSERT_EQ(limiter._sw.succ_count, 0);
+ ASSERT_EQ(limiter._sw.failed_count, 0);
+ ASSERT_EQ(limiter._avg_latency_us, 50);
+
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ ASSERT_EQ(limiter._sw.succ_count, 0);
+ ASSERT_EQ(limiter._sw.failed_count, 0);
+ ASSERT_EQ(limiter._avg_latency_us, 50);
+
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ ASSERT_EQ(limiter._sw.succ_count, 6);
+ ASSERT_EQ(limiter._sw.failed_count, 0);
+
+ limiter.ResetSampleWindow(butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(0, 50, butil::gettimeofday_us());
+ limiter.AddSample(1, 50, butil::gettimeofday_us());
+ limiter.AddSample(1, 50, butil::gettimeofday_us());
+ limiter.AddSample(1, 50, butil::gettimeofday_us());
+ ASSERT_EQ(limiter._sw.succ_count, 3);
+ ASSERT_EQ(limiter._sw.failed_count, 3);
+ }
+}
+
+TEST(TimeoutConcurrencyLimiterTest, OnResponded) {
+ brpc::policy::FLAGS_timeout_cl_sample_window_size_ms = 10;
+ brpc::policy::FLAGS_timeout_cl_min_sample_count = 5;
+ brpc::policy::FLAGS_timeout_cl_max_sample_count = 10;
+ brpc::policy::TimeoutConcurrencyLimiter limiter;
+ limiter.OnResponded(0, 50);
+ limiter.OnResponded(0, 50);
+ bthread_usleep(100);
+ limiter.OnResponded(0, 50);
+ limiter.OnResponded(1, 50);
+ ASSERT_EQ(limiter._sw.succ_count, 2);
+ ASSERT_EQ(limiter._sw.failed_count, 0);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org