You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2020/10/13 09:09:14 UTC

[incubator-pegasus] branch master updated: feat(hotkey): add an interface of hotkey capture (#615)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d3ea55  feat(hotkey): add an interface of hotkey capture (#615)
0d3ea55 is described below

commit 0d3ea55fd4d77157bdf71977945fe9f408cf2bfd
Author: Smilencer <52...@qq.com>
AuthorDate: Tue Oct 13 17:09:03 2020 +0800

    feat(hotkey): add an interface of hotkey capture (#615)
---
 src/server/capacity_unit_calculator.cpp           | 41 ++++++++++++++++++++---
 src/server/capacity_unit_calculator.h             | 33 +++++++++++++++---
 src/server/pegasus_server_impl.cpp                |  7 ++--
 src/server/pegasus_write_service.cpp              |  2 +-
 src/server/test/capacity_unit_calculator_test.cpp | 10 +++---
 5 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 62e7dd3..6a055e6 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -3,14 +3,25 @@
 // can be found in the LICENSE file in the root directory of this source tree.
 
 #include "capacity_unit_calculator.h"
+
 #include <dsn/utility/config_api.h>
 #include <rocksdb/status.h>
+#include "hotkey_collector.h"
 
 namespace pegasus {
 namespace server {
 
-capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_base(r)
+capacity_unit_calculator::capacity_unit_calculator(
+    replica_base *r,
+    std::shared_ptr<hotkey_collector> read_hotkey_collector,
+    std::shared_ptr<hotkey_collector> write_hotkey_collector)
+    : replica_base(r),
+      _read_hotkey_collector(read_hotkey_collector),
+      _write_hotkey_collector(write_hotkey_collector)
 {
+    dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a nullptr");
+    dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a nullptr");
+
     _read_capacity_unit_size =
         dsn_config_get_value_uint64("pegasus.server",
                                     "perf_counter_read_capacity_unit_size",
@@ -99,9 +110,11 @@ void capacity_unit_calculator::add_get_cu(int32_t status,
 
     if (status == rocksdb::Status::kNotFound) {
         add_read_cu(1);
+        _read_hotkey_collector->capture_raw_key(key, 1);
         return;
     }
     add_read_cu(key.size() + value.size());
+    _read_hotkey_collector->capture_raw_key(key, 1);
 }
 
 void capacity_unit_calculator::add_multi_get_cu(int32_t status,
@@ -121,11 +134,14 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
         return;
     }
 
+    uint64_t key_count = kvs.size();
     if (status == rocksdb::Status::kNotFound) {
         add_read_cu(1);
+        _read_hotkey_collector->capture_hash_key(hash_key, key_count);
         return;
     }
     add_read_cu(data_size);
+    _read_hotkey_collector->capture_hash_key(hash_key, key_count);
 }
 
 void capacity_unit_calculator::add_scan_cu(int32_t status,
@@ -141,6 +157,7 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
         return;
     }
 
+    // TODO: (Tangyanzhao) hotkey detect in scan
     int64_t data_size = 0;
     for (const auto &kv : kvs) {
         data_size += kv.key.size() + kv.value.size();
@@ -149,20 +166,22 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
     _pfc_scan_bytes->add(data_size);
 }
 
-void capacity_unit_calculator::add_sortkey_count_cu(int32_t status)
+void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
     }
     add_read_cu(1);
+    _read_hotkey_collector->capture_hash_key(hash_key, 1);
 }
 
-void capacity_unit_calculator::add_ttl_cu(int32_t status)
+void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
     }
     add_read_cu(1);
+    _read_hotkey_collector->capture_raw_key(key, 1);
 }
 
 void capacity_unit_calculator::add_put_cu(int32_t status,
@@ -174,6 +193,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
         return;
     }
     add_write_cu(key.size() + value.size());
+    _write_hotkey_collector->capture_raw_key(key, 1);
 }
 
 void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key)
@@ -182,6 +202,7 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke
         return;
     }
     add_write_cu(key.size());
+    _write_hotkey_collector->capture_raw_key(key, 1);
 }
 
 void capacity_unit_calculator::add_multi_put_cu(int32_t status,
@@ -195,6 +216,8 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
         data_size += hash_key.size() + kv.key.size() + kv.value.size();
     }
     _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
+    uint64_t key_count = kvs.size();
+    _write_hotkey_collector->capture_raw_key(hash_key, key_count);
 
     if (status != rocksdb::Status::kOk) {
         return;
@@ -214,18 +237,22 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status,
     for (const auto &sort_key : sort_keys) {
         data_size += hash_key.size() + sort_key.size();
     }
+    uint64_t key_count = sort_keys.size();
+    _write_hotkey_collector->capture_hash_key(hash_key, key_count);
     add_write_cu(data_size);
 }
 
-void capacity_unit_calculator::add_incr_cu(int32_t status)
+void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument) {
         return;
     }
     if (status == rocksdb::Status::kOk) {
         add_write_cu(1);
+        _write_hotkey_collector->capture_raw_key(key, 1);
     }
     add_read_cu(1);
+    _read_hotkey_collector->capture_raw_key(key, 1);
 }
 
 void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
@@ -244,8 +271,10 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
 
     if (status == rocksdb::Status::kOk) {
         add_write_cu(hash_key.size() + set_sort_key.size() + value.size());
+        _write_hotkey_collector->capture_hash_key(hash_key, 1);
     }
     add_read_cu(hash_key.size() + check_sort_key.size());
+    _read_hotkey_collector->capture_hash_key(hash_key, 1);
 }
 
 void capacity_unit_calculator::add_check_and_mutate_cu(
@@ -267,11 +296,13 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
         status != rocksdb::Status::kTryAgain) {
         return;
     }
-
+    uint64_t key_count = mutate_list.size();
     if (status == rocksdb::Status::kOk) {
         add_write_cu(data_size);
+        _write_hotkey_collector->capture_hash_key(hash_key, key_count);
     }
     add_read_cu(hash_key.size() + check_sort_key.size());
+    _read_hotkey_collector->capture_hash_key(hash_key, 1);
 }
 
 } // namespace server
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index a731fe4..95e2e53 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -11,18 +11,22 @@
 namespace pegasus {
 namespace server {
 
+class hotkey_collector;
+
 class capacity_unit_calculator : public dsn::replication::replica_base
 {
 public:
-    explicit capacity_unit_calculator(replica_base *r);
+    capacity_unit_calculator(replica_base *r,
+                             std::shared_ptr<hotkey_collector> read_hotkey_collector,
+                             std::shared_ptr<hotkey_collector> write_hotkey_collector);
 
     void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
     void add_multi_get_cu(int32_t status,
                           const dsn::blob &hash_key,
                           const std::vector<::dsn::apps::key_value> &kvs);
     void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
-    void add_sortkey_count_cu(int32_t status);
-    void add_ttl_cu(int32_t status);
+    void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key);
+    void add_ttl_cu(int32_t status, const dsn::blob &key);
 
     void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
     void add_remove_cu(int32_t status, const dsn::blob &key);
@@ -32,7 +36,7 @@ public:
     void add_multi_remove_cu(int32_t status,
                              const dsn::blob &hash_key,
                              const std::vector<::dsn::blob> &sort_keys);
-    void add_incr_cu(int32_t status);
+    void add_incr_cu(int32_t status, const dsn::blob &key);
     void add_check_and_set_cu(int32_t status,
                               const dsn::blob &hash_key,
                               const dsn::blob &check_sort_key,
@@ -70,6 +74,27 @@ private:
     ::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
     ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
     ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
+
+    /*
+        hotkey capturing weight rules:
+            add_get_cu: whether find the key or not, weight = 1(read_collector),
+            add_multi_get_cu: weight = returned sortkey count(read_collector),
+            add_scan_cu : not capture now,
+            add_sortkey_count_cu: weight = 1(read_collector),
+            add_ttl_cu: weight = 1(read_collector),
+            add_put_cu: weight = 1(write_collector),
+            add_remove_cu: weight = 1(write_collector),
+            add_multi_put_cu: weight = returned sortkey count(write_collector),
+            add_multi_remove_cu: weight = returned sortkey count(write_collector),
+            add_incr_cu: if find the key, weight = 1(write_collector),
+                         else weight = 1(read_collector)
+            add_check_and_set_cu: if find the key, weight = 1(write_collector),
+                         else weight = 1(read_collector)
+            add_check_and_mutate_cu: if find the key, weight = mutate_list size
+                                     else weight = 1
+    */
+    std::shared_ptr<hotkey_collector> _read_hotkey_collector;
+    std::shared_ptr<hotkey_collector> _write_hotkey_collector;
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 122c638..cdd5b35 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -795,7 +795,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
         resp.count = -1;
     }
 
-    _cu_calculator->add_sortkey_count_cu(resp.error);
+    _cu_calculator->add_sortkey_count_cu(resp.error, hash_key);
     _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
@@ -857,7 +857,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
         }
     }
 
-    _cu_calculator->add_ttl_cu(resp.error);
+    _cu_calculator->add_ttl_cu(resp.error, key);
 }
 
 void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
@@ -1486,7 +1486,8 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     });
 
     // initialize cu calculator and write service after server being initialized.
-    _cu_calculator = dsn::make_unique<capacity_unit_calculator>(this);
+    _cu_calculator = dsn::make_unique<capacity_unit_calculator>(
+        this, _read_hotkey_collector, _write_hotkey_collector);
     _server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
 
     return ::dsn::ERR_OK;
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index f448344..e6d090c 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -173,7 +173,7 @@ int pegasus_write_service::incr(int64_t decree,
     int err = _impl->incr(decree, update, resp);
 
     if (_server->is_primary()) {
-        _cu_calculator->add_incr_cu(resp.error);
+        _cu_calculator->add_incr_cu(resp.error, update.key);
     }
 
     _pfc_incr_latency->set(dsn_now_ns() - start_time);
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index 10ac99e..0ad3f6e 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -6,6 +6,7 @@
 #include "server/capacity_unit_calculator.h"
 
 #include <dsn/dist/replication/replica_base.h>
+#include "server/hotkey_collector.h"
 
 namespace pegasus {
 namespace server {
@@ -26,7 +27,8 @@ public:
     }
 
     explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
-        : capacity_unit_calculator(r)
+        : capacity_unit_calculator(
+              r, std::make_shared<hotkey_collector>(), std::make_shared<hotkey_collector>())
     {
     }
 
@@ -200,7 +202,7 @@ TEST_F(capacity_unit_calculator_test, scan)
 TEST_F(capacity_unit_calculator_test, sortkey_count)
 {
     for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
-        _cal->add_sortkey_count_cu(i);
+        _cal->add_sortkey_count_cu(i, key);
         if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
             ASSERT_EQ(_cal->read_cu, 1);
         } else {
@@ -214,7 +216,7 @@ TEST_F(capacity_unit_calculator_test, sortkey_count)
 TEST_F(capacity_unit_calculator_test, ttl)
 {
     for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
-        _cal->add_ttl_cu(i);
+        _cal->add_ttl_cu(i, key);
         if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
             ASSERT_EQ(_cal->read_cu, 1);
         } else {
@@ -300,7 +302,7 @@ TEST_F(capacity_unit_calculator_test, multi_remove)
 TEST_F(capacity_unit_calculator_test, incr)
 {
     for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
-        _cal->add_incr_cu(i);
+        _cal->add_incr_cu(i, key);
         if (i == rocksdb::Status::kOk) {
             ASSERT_EQ(_cal->read_cu, 1);
             ASSERT_EQ(_cal->write_cu, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org