You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2020/10/16 02:26:21 UTC

[incubator-pegasus] branch master updated: feat(hotkey): capture data part1 - declare internal collector (#618)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 1461dc5  feat(hotkey): capture data part1 - declare internal collector (#618)
1461dc5 is described below

commit 1461dc55035d03552e62a1d9ffa7ac27cf9e1423
Author: Smilencer <52...@qq.com>
AuthorDate: Fri Oct 16 10:26:10 2020 +0800

    feat(hotkey): capture data part1 - declare internal collector (#618)
---
 src/server/capacity_unit_calculator.cpp |  2 +-
 src/server/hotkey_collector.cpp         | 20 ++++++++++++++++++--
 src/server/hotkey_collector.h           | 20 ++++++++++++++++++++
 src/server/pegasus_server_impl.cpp      | 16 ++++++++++++++++
 4 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 6a055e6..07af04f 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -217,7 +217,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
     }
     _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
     uint64_t key_count = kvs.size();
-    _write_hotkey_collector->capture_raw_key(hash_key, key_count);
+    _write_hotkey_collector->capture_hash_key(hash_key, key_count);
 
     if (status != rocksdb::Status::kOk) {
         return;
diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp
index c05e33c..cb3d1c9 100644
--- a/src/server/hotkey_collector.cpp
+++ b/src/server/hotkey_collector.cpp
@@ -17,9 +17,17 @@
 
 #include "hotkey_collector.h"
 
+#include <dsn/utility/smart_pointers.h>
+#include "base/pegasus_key_schema.h"
+
 namespace pegasus {
 namespace server {
 
+hotkey_collector::hotkey_collector()
+    : _internal_collector(dsn::make_unique<hotkey_empty_data_collector>())
+{
+}
+
 // TODO: (Tangyanzhao) implement these functions
 void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
                                   dsn::replication::detect_hotkey_response &resp)
@@ -28,10 +36,18 @@ void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request
 
 void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
 {
-    // TODO: (Tangyanzhao) Add a judgment sentence to check if it is a raw key
+    dsn::blob hash_key, sort_key;
+    pegasus_restore_key(raw_key, hash_key, sort_key);
+    capture_hash_key(hash_key, weight);
+}
+
+void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight)
+{
+    // TODO: (Tangyanzhao) add a unit test to ensure data integrity
+    _internal_collector->capture_data(hash_key, weight);
 }
 
-void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weight) {}
+void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
 
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h
index a1818d3..5c092dc 100644
--- a/src/server/hotkey_collector.h
+++ b/src/server/hotkey_collector.h
@@ -24,6 +24,8 @@
 namespace pegasus {
 namespace server {
 
+class internal_collector_base;
+
 //    hotkey_collector is responsible to find the hot keys after the partition
 //    was detected to be hot. The two types of hotkey, READ & WRITE, are detected
 //    separately.
@@ -64,16 +66,34 @@ namespace server {
 class hotkey_collector
 {
 public:
+    hotkey_collector();
     // TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
     // weight: calculate the weight according to the specific situation
     void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
     void capture_hash_key(const dsn::blob &hash_key, int64_t weight);
+    void analyse_data();
     void handle_rpc(const dsn::replication::detect_hotkey_request &req,
                     /*out*/ dsn::replication::detect_hotkey_response &resp);
 
 private:
+    std::unique_ptr<internal_collector_base> _internal_collector;
     std::atomic<hotkey_collector_state> _state;
 };
 
+class internal_collector_base
+{
+public:
+    virtual void capture_data(const dsn::blob &hash_key, uint64_t weight) = 0;
+    virtual void analyse_data() = 0;
+};
+
+// used in hotkey_collector_state::STOPPED and hotkey_collector_state::FINISHED, avoid null pointers
+class hotkey_empty_data_collector : public internal_collector_base
+{
+public:
+    void capture_data(const dsn::blob &hash_key, uint64_t size) {}
+    void analyse_data() {}
+};
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index cdd5b35..e565d53 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -15,6 +15,7 @@
 #include <dsn/utility/string_conv.h>
 #include <dsn/dist/fmt_logging.h>
 #include <dsn/dist/replication/replication.codes.h>
+#include <dsn/utility/flags.h>
 
 #include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
@@ -31,6 +32,11 @@ namespace server {
 
 DEFINE_TASK_CODE(LPC_PEGASUS_SERVER_DELAY, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
 
+DSN_DEFINE_int32("pegasus.server",
+                 hotkey_analyse_time_interval_s,
+                 10,
+                 "hotkey analyse interval in seconds");
+
 static std::string chkpt_get_dir_name(int64_t decree)
 {
     char buffer[256];
@@ -1490,6 +1496,16 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         this, _read_hotkey_collector, _write_hotkey_collector);
     _server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
 
+    ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+                                  &_tracker,
+                                  [this]() { _read_hotkey_collector->analyse_data(); },
+                                  std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+
+    ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+                                  &_tracker,
+                                  [this]() { _write_hotkey_collector->analyse_data(); },
+                                  std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+
     return ::dsn::ERR_OK;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org