You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/10/22 06:06:52 UTC
[incubator-pegasus] branch master updated: feat(hotkey): collector
can be terminated by timeout (#625)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e17d5c7 feat(hotkey): collector can be terminated by timeout (#625)
e17d5c7 is described below
commit e17d5c7a69570d76bbefd0777b429a6e878ec93e
Author: Smilencer <52...@qq.com>
AuthorDate: Thu Oct 22 14:02:48 2020 +0800
feat(hotkey): collector can be terminated by timeout (#625)
---
src/server/hotkey_collector.cpp | 44 ++++++++++++++++++++---
src/server/hotkey_collector.h | 4 +++
src/server/test/capacity_unit_calculator_test.cpp | 4 +--
3 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index dfde0c5..16984ee 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -21,16 +21,24 @@
#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
+#include <dsn/utility/flags.h>
namespace pegasus {
namespace server {
+DSN_DEFINE_int32(
+ "pegasus.server",
+ max_seconds_to_detect_hotkey,
+ 150,
+ "the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
+
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
- _internal_collector(std::make_shared<hotkey_empty_data_collector>())
+ _internal_collector(std::make_shared<hotkey_empty_data_collector>()),
+ _collector_start_time_second(0)
{
}
@@ -65,7 +73,18 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh
_internal_collector->capture_data(hash_key, weight);
}
-void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
+void hotkey_collector::analyse_data()
+{
+ switch (_state.load()) {
+ case hotkey_collector_state::COARSE_DETECTING:
+ if (!terminate_if_timeout()) {
+ _internal_collector->analyse_data();
+ }
+ return;
+ default:
+ return;
+ }
+}
void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
@@ -88,6 +107,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
+ _collector_start_time_second = dsn_now_s();
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
@@ -105,13 +125,29 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
- _state.store(hotkey_collector_state::STOPPED);
- _internal_collector.reset();
+ terminate();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}
+void hotkey_collector::terminate()
+{
+ _state.store(hotkey_collector_state::STOPPED);
+ _internal_collector.reset();
+ _collector_start_time_second = 0;
+}
+
+bool hotkey_collector::terminate_if_timeout()
+{
+ if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
+ ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
+ terminate();
+ return true;
+ }
+ return false;
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index c649fac..6cf34cc 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -79,9 +79,13 @@ public:
private:
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
+ void terminate();
+ bool terminate_if_timeout();
+
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
+ uint64_t _collector_start_time_second;
};
class internal_collector_base
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index 8964af5..5dbf212 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -30,8 +30,8 @@ public:
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r,
- std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
- std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
+ std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
+ std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
{
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org