You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/10/22 06:06:52 UTC

[incubator-pegasus] branch master updated: feat(hotkey): collector can be terminated by timeout (#625)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e17d5c7  feat(hotkey): collector can be terminated by timeout (#625)
e17d5c7 is described below

commit e17d5c7a69570d76bbefd0777b429a6e878ec93e
Author: Smilencer <52...@qq.com>
AuthorDate: Thu Oct 22 14:02:48 2020 +0800

    feat(hotkey): collector can be terminated by timeout (#625)
---
 src/server/hotkey_collector.cpp                   | 44 ++++++++++++++++++++---
 src/server/hotkey_collector.h                     |  4 +++
 src/server/test/capacity_unit_calculator_test.cpp |  4 +--
 3 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index dfde0c5..16984ee 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -21,16 +21,24 @@
 #include <dsn/utility/smart_pointers.h>
 #include "base/pegasus_key_schema.h"
 #include <dsn/dist/fmt_logging.h>
+#include <dsn/utility/flags.h>
 
 namespace pegasus {
 namespace server {
 
+DSN_DEFINE_int32(
+    "pegasus.server",
+    max_seconds_to_detect_hotkey,
+    150,
+    "the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
+
 hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
                                    dsn::replication::replica_base *r_base)
     : replica_base(r_base),
       _state(hotkey_collector_state::STOPPED),
       _hotkey_type(hotkey_type),
-      _internal_collector(std::make_shared<hotkey_empty_data_collector>())
+      _internal_collector(std::make_shared<hotkey_empty_data_collector>()),
+      _collector_start_time_second(0)
 {
 }
 
@@ -65,7 +73,18 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh
     _internal_collector->capture_data(hash_key, weight);
 }
 
-void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
+void hotkey_collector::analyse_data()
+{
+    switch (_state.load()) {
+    case hotkey_collector_state::COARSE_DETECTING:
+        if (!terminate_if_timeout()) {
+            _internal_collector->analyse_data();
+        }
+        return;
+    default:
+        return;
+    }
+}
 
 void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
 {
@@ -88,6 +107,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
         dwarn_replica(hint);
         return;
     case hotkey_collector_state::STOPPED:
+        _collector_start_time_second = dsn_now_s();
         // TODO: (Tangyanzhao) start coarse detecting
         _state.store(hotkey_collector_state::COARSE_DETECTING);
         resp.err = dsn::ERR_OK;
@@ -105,13 +125,29 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
 
 void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
 {
-    _state.store(hotkey_collector_state::STOPPED);
-    _internal_collector.reset();
+    terminate();
     resp.err = dsn::ERR_OK;
     std::string hint =
         fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
     ddebug_replica(hint);
 }
 
+void hotkey_collector::terminate()
+{
+    _state.store(hotkey_collector_state::STOPPED);
+    _internal_collector.reset();
+    _collector_start_time_second = 0;
+}
+
+bool hotkey_collector::terminate_if_timeout()
+{
+    if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
+        ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
+        terminate();
+        return true;
+    }
+    return false;
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index c649fac..6cf34cc 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -79,9 +79,13 @@ public:
 private:
     void on_start_detect(dsn::replication::detect_hotkey_response &resp);
     void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
+    void terminate();
+    bool terminate_if_timeout();
+
     std::atomic<hotkey_collector_state> _state;
     const dsn::replication::hotkey_type::type _hotkey_type;
     std::shared_ptr<internal_collector_base> _internal_collector;
+    uint64_t _collector_start_time_second;
 };
 
 class internal_collector_base
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index 8964af5..5dbf212 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -30,8 +30,8 @@ public:
     explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
         : capacity_unit_calculator(
               r,
-              std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
-              std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
+              std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
+              std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
     {
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org