You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2020/10/13 09:09:14 UTC
[incubator-pegasus] branch master updated: feat(hotkey): add an
interface of hotkey capture (#615)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 0d3ea55 feat(hotkey): add an interface of hotkey capture (#615)
0d3ea55 is described below
commit 0d3ea55fd4d77157bdf71977945fe9f408cf2bfd
Author: Smilencer <52...@qq.com>
AuthorDate: Tue Oct 13 17:09:03 2020 +0800
feat(hotkey): add an interface of hotkey capture (#615)
---
src/server/capacity_unit_calculator.cpp | 41 ++++++++++++++++++++---
src/server/capacity_unit_calculator.h | 33 +++++++++++++++---
src/server/pegasus_server_impl.cpp | 7 ++--
src/server/pegasus_write_service.cpp | 2 +-
src/server/test/capacity_unit_calculator_test.cpp | 10 +++---
5 files changed, 76 insertions(+), 17 deletions(-)
diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 62e7dd3..6a055e6 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -3,14 +3,25 @@
// can be found in the LICENSE file in the root directory of this source tree.
#include "capacity_unit_calculator.h"
+
#include <dsn/utility/config_api.h>
#include <rocksdb/status.h>
+#include "hotkey_collector.h"
namespace pegasus {
namespace server {
-capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_base(r)
+capacity_unit_calculator::capacity_unit_calculator(
+ replica_base *r,
+ std::shared_ptr<hotkey_collector> read_hotkey_collector,
+ std::shared_ptr<hotkey_collector> write_hotkey_collector)
+ : replica_base(r),
+ _read_hotkey_collector(read_hotkey_collector),
+ _write_hotkey_collector(write_hotkey_collector)
{
+ dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a nullptr");
+ dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a nullptr");
+
_read_capacity_unit_size =
dsn_config_get_value_uint64("pegasus.server",
"perf_counter_read_capacity_unit_size",
@@ -99,9 +110,11 @@ void capacity_unit_calculator::add_get_cu(int32_t status,
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
+ _read_hotkey_collector->capture_raw_key(key, 1);
return;
}
add_read_cu(key.size() + value.size());
+ _read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_multi_get_cu(int32_t status,
@@ -121,11 +134,14 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
return;
}
+ uint64_t key_count = kvs.size();
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
+ _read_hotkey_collector->capture_hash_key(hash_key, key_count);
return;
}
add_read_cu(data_size);
+ _read_hotkey_collector->capture_hash_key(hash_key, key_count);
}
void capacity_unit_calculator::add_scan_cu(int32_t status,
@@ -141,6 +157,7 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
return;
}
+ // TODO: (Tangyanzhao) hotkey detect in scan
int64_t data_size = 0;
for (const auto &kv : kvs) {
data_size += kv.key.size() + kv.value.size();
@@ -149,20 +166,22 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
_pfc_scan_bytes->add(data_size);
}
-void capacity_unit_calculator::add_sortkey_count_cu(int32_t status)
+void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
+ _read_hotkey_collector->capture_hash_key(hash_key, 1);
}
-void capacity_unit_calculator::add_ttl_cu(int32_t status)
+void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
+ _read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_put_cu(int32_t status,
@@ -174,6 +193,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
return;
}
add_write_cu(key.size() + value.size());
+ _write_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key)
@@ -182,6 +202,7 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke
return;
}
add_write_cu(key.size());
+ _write_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_multi_put_cu(int32_t status,
@@ -195,6 +216,8 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
_pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
+ uint64_t key_count = kvs.size();
+ _write_hotkey_collector->capture_raw_key(hash_key, key_count);
if (status != rocksdb::Status::kOk) {
return;
@@ -214,18 +237,22 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status,
for (const auto &sort_key : sort_keys) {
data_size += hash_key.size() + sort_key.size();
}
+ uint64_t key_count = sort_keys.size();
+ _write_hotkey_collector->capture_hash_key(hash_key, key_count);
add_write_cu(data_size);
}
-void capacity_unit_calculator::add_incr_cu(int32_t status)
+void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument) {
return;
}
if (status == rocksdb::Status::kOk) {
add_write_cu(1);
+ _write_hotkey_collector->capture_raw_key(key, 1);
}
add_read_cu(1);
+ _read_hotkey_collector->capture_raw_key(key, 1);
}
void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
@@ -244,8 +271,10 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
if (status == rocksdb::Status::kOk) {
add_write_cu(hash_key.size() + set_sort_key.size() + value.size());
+ _write_hotkey_collector->capture_hash_key(hash_key, 1);
}
add_read_cu(hash_key.size() + check_sort_key.size());
+ _read_hotkey_collector->capture_hash_key(hash_key, 1);
}
void capacity_unit_calculator::add_check_and_mutate_cu(
@@ -267,11 +296,13 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
status != rocksdb::Status::kTryAgain) {
return;
}
-
+ uint64_t key_count = mutate_list.size();
if (status == rocksdb::Status::kOk) {
add_write_cu(data_size);
+ _write_hotkey_collector->capture_hash_key(hash_key, key_count);
}
add_read_cu(hash_key.size() + check_sort_key.size());
+ _read_hotkey_collector->capture_hash_key(hash_key, 1);
}
} // namespace server
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index a731fe4..95e2e53 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -11,18 +11,22 @@
namespace pegasus {
namespace server {
+class hotkey_collector;
+
class capacity_unit_calculator : public dsn::replication::replica_base
{
public:
- explicit capacity_unit_calculator(replica_base *r);
+ capacity_unit_calculator(replica_base *r,
+ std::shared_ptr<hotkey_collector> read_hotkey_collector,
+ std::shared_ptr<hotkey_collector> write_hotkey_collector);
void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_multi_get_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
- void add_sortkey_count_cu(int32_t status);
- void add_ttl_cu(int32_t status);
+ void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key);
+ void add_ttl_cu(int32_t status, const dsn::blob &key);
void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_remove_cu(int32_t status, const dsn::blob &key);
@@ -32,7 +36,7 @@ public:
void add_multi_remove_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::blob> &sort_keys);
- void add_incr_cu(int32_t status);
+ void add_incr_cu(int32_t status, const dsn::blob &key);
void add_check_and_set_cu(int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
@@ -70,6 +74,27 @@ private:
::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
+
+ /*
+ hotkey capturing weight rules:
+ add_get_cu: whether find the key or not, weight = 1(read_collector),
+ add_multi_get_cu: weight = returned sortkey count(read_collector),
+ add_scan_cu : not capture now,
+ add_sortkey_count_cu: weight = 1(read_collector),
+ add_ttl_cu: weight = 1(read_collector),
+ add_put_cu: weight = 1(write_collector),
+ add_remove_cu: weight = 1(write_collector),
+ add_multi_put_cu: weight = returned sortkey count(write_collector),
+ add_multi_remove_cu: weight = returned sortkey count(write_collector),
+ add_incr_cu: if find the key, weight = 1(write_collector),
+ else weight = 1(read_collector)
+ add_check_and_set_cu: if find the key, weight = 1(write_collector),
+ else weight = 1(read_collector)
+ add_check_and_mutate_cu: if find the key, weight = mutate_list size
+ else weight = 1
+ */
+ std::shared_ptr<hotkey_collector> _read_hotkey_collector;
+ std::shared_ptr<hotkey_collector> _write_hotkey_collector;
};
} // namespace server
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 122c638..cdd5b35 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -795,7 +795,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
resp.count = -1;
}
- _cu_calculator->add_sortkey_count_cu(resp.error);
+ _cu_calculator->add_sortkey_count_cu(resp.error, hash_key);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
@@ -857,7 +857,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
}
}
- _cu_calculator->add_ttl_cu(resp.error);
+ _cu_calculator->add_ttl_cu(resp.error, key);
}
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
@@ -1486,7 +1486,8 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
});
// initialize cu calculator and write service after server being initialized.
- _cu_calculator = dsn::make_unique<capacity_unit_calculator>(this);
+ _cu_calculator = dsn::make_unique<capacity_unit_calculator>(
+ this, _read_hotkey_collector, _write_hotkey_collector);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
return ::dsn::ERR_OK;
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index f448344..e6d090c 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -173,7 +173,7 @@ int pegasus_write_service::incr(int64_t decree,
int err = _impl->incr(decree, update, resp);
if (_server->is_primary()) {
- _cu_calculator->add_incr_cu(resp.error);
+ _cu_calculator->add_incr_cu(resp.error, update.key);
}
_pfc_incr_latency->set(dsn_now_ns() - start_time);
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index 10ac99e..0ad3f6e 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -6,6 +6,7 @@
#include "server/capacity_unit_calculator.h"
#include <dsn/dist/replication/replica_base.h>
+#include "server/hotkey_collector.h"
namespace pegasus {
namespace server {
@@ -26,7 +27,8 @@ public:
}
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
- : capacity_unit_calculator(r)
+ : capacity_unit_calculator(
+ r, std::make_shared<hotkey_collector>(), std::make_shared<hotkey_collector>())
{
}
@@ -200,7 +202,7 @@ TEST_F(capacity_unit_calculator_test, scan)
TEST_F(capacity_unit_calculator_test, sortkey_count)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
- _cal->add_sortkey_count_cu(i);
+ _cal->add_sortkey_count_cu(i, key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
@@ -214,7 +216,7 @@ TEST_F(capacity_unit_calculator_test, sortkey_count)
TEST_F(capacity_unit_calculator_test, ttl)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
- _cal->add_ttl_cu(i);
+ _cal->add_ttl_cu(i, key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
@@ -300,7 +302,7 @@ TEST_F(capacity_unit_calculator_test, multi_remove)
TEST_F(capacity_unit_calculator_test, incr)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
- _cal->add_incr_cu(i);
+ _cal->add_incr_cu(i, key);
if (i == rocksdb::Status::kOk) {
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org