You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2021/06/23 10:42:39 UTC
[incubator-pegasus] 02/05: feat: add perf counter for backup
request size (#742)
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch v2.2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit d92684ab3631b31155f8ae5bbb9ba8d9e9b39f95
Author: zhao liwei <zl...@163.com>
AuthorDate: Wed Jun 2 15:29:38 2021 +0800
feat: add perf counter for backup request size (#742)
---
src/server/capacity_unit_calculator.cpp | 39 +++++--
src/server/capacity_unit_calculator.h | 17 ++-
src/server/info_collector.cpp | 1 +
src/server/info_collector.h | 2 +
src/server/pegasus_server_impl.cpp | 25 +++--
src/server/test/capacity_unit_calculator_test.cpp | 127 ++++++++++++++++++----
src/shell/command_helper.h | 4 +
7 files changed, 172 insertions(+), 43 deletions(-)
diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index a457a1e..22d5ff0 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -94,6 +94,10 @@ capacity_unit_calculator::capacity_unit_calculator(
snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
_pfc_check_and_mutate_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
+
+ snprintf(name, 255, "backup_request_bytes@%s", str_gpid.c_str());
+ _pfc_backup_request_bytes.init_app_counter(
+ "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the backup request bytes");
}
int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
@@ -114,11 +118,14 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
return write_cu;
}
-void capacity_unit_calculator::add_get_cu(int32_t status,
+void capacity_unit_calculator::add_get_cu(dsn::message_ex *req,
+ int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
- _pfc_get_bytes->add(key.size() + value.size());
+ auto total_size = key.size() + value.size();
+ _pfc_get_bytes->add(total_size);
+ add_backup_request_bytes(req, total_size);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
@@ -132,7 +139,8 @@ void capacity_unit_calculator::add_get_cu(int32_t status,
_read_hotkey_collector->capture_raw_key(key, 1);
}
-void capacity_unit_calculator::add_multi_get_cu(int32_t status,
+void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
+ int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs)
{
@@ -142,7 +150,9 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
multi_get_bytes += kv.key.size() + kv.value.size();
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
- _pfc_multi_get_bytes->add(hash_key.size() + multi_get_bytes);
+ auto total_size = hash_key.size() + multi_get_bytes;
+ _pfc_multi_get_bytes->add(total_size);
+ add_backup_request_bytes(req, total_size);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
@@ -159,7 +169,8 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}
-void capacity_unit_calculator::add_scan_cu(int32_t status,
+void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
+ int32_t status,
const std::vector<::dsn::apps::key_value> &kvs)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -179,23 +190,30 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
}
add_read_cu(data_size);
_pfc_scan_bytes->add(data_size);
+ add_backup_request_bytes(req, data_size);
}
-void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
+void capacity_unit_calculator::add_sortkey_count_cu(dsn::message_ex *req,
+ int32_t status,
+ const dsn::blob &hash_key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
+ add_backup_request_bytes(req, 1);
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}
-void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
+void capacity_unit_calculator::add_ttl_cu(dsn::message_ex *req,
+ int32_t status,
+ const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
+ add_backup_request_bytes(req, 1);
_read_hotkey_collector->capture_raw_key(key, 1);
}
@@ -320,5 +338,12 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}
+void capacity_unit_calculator::add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
+{
+ if (req->is_backup_request()) {
+ _pfc_backup_request_bytes->add(bytes);
+ }
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index f86b0a4..ab0f662 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -37,13 +37,17 @@ public:
virtual ~capacity_unit_calculator() = default;
- void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
- void add_multi_get_cu(int32_t status,
+ void
+ add_get_cu(dsn::message_ex *req, int32_t status, const dsn::blob &key, const dsn::blob &value);
+ void add_multi_get_cu(dsn::message_ex *req,
+ int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
- void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
- void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key);
- void add_ttl_cu(int32_t status, const dsn::blob &key);
+ void add_scan_cu(dsn::message_ex *req,
+ int32_t status,
+ const std::vector<::dsn::apps::key_value> &kvs);
+ void add_sortkey_count_cu(dsn::message_ex *req, int32_t status, const dsn::blob &hash_key);
+ void add_ttl_cu(dsn::message_ex *req, int32_t status, const dsn::blob &key);
void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_remove_cu(int32_t status, const dsn::blob &key);
@@ -70,9 +74,11 @@ protected:
#ifdef PEGASUS_UNIT_TEST
virtual int64_t add_read_cu(int64_t read_data_size);
virtual int64_t add_write_cu(int64_t write_data_size);
+ virtual void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes);
#else
int64_t add_read_cu(int64_t read_data_size);
int64_t add_write_cu(int64_t write_data_size);
+ void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes);
#endif
private:
@@ -91,6 +97,7 @@ private:
::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
+ ::dsn::perf_counter_wrapper _pfc_backup_request_bytes;
/*
hotkey capturing weight rules:
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index 42be060..3cfd29f 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -227,6 +227,7 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
INIT_COUNTER(read_qps);
INIT_COUNTER(write_qps);
INIT_COUNTER(backup_request_qps);
+ INIT_COUNTER(backup_request_bytes);
INIT_COUNTER(get_bytes);
INIT_COUNTER(multi_get_bytes);
INIT_COUNTER(scan_bytes);
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index 5f09aa4..4c54dbe 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -95,6 +95,7 @@ public:
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
backup_request_qps->set(row_stats.backup_request_qps);
+ backup_request_bytes->set(row_stats.backup_request_bytes);
get_bytes->set(row_stats.get_bytes);
multi_get_bytes->set(row_stats.multi_get_bytes);
scan_bytes->set(row_stats.scan_bytes);
@@ -141,6 +142,7 @@ public:
::dsn::perf_counter_wrapper read_qps;
::dsn::perf_counter_wrapper write_qps;
::dsn::perf_counter_wrapper backup_request_qps;
+ ::dsn::perf_counter_wrapper backup_request_bytes;
::dsn::perf_counter_wrapper get_bytes;
::dsn::perf_counter_wrapper multi_get_bytes;
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index f0d5f50..d31cb0b 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -335,7 +335,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
pegasus_extract_user_data(_pegasus_data_version, std::move(value), resp.value);
}
- _cu_calculator->add_get_cu(resp.error, key, resp.value);
+ _cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
_pfc_get_latency->set(dsn_now_ns() - start_time);
}
@@ -346,6 +346,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -358,7 +359,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
rpc.remote_address().to_string(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
- _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+ _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
@@ -443,7 +444,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
- _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+ _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
@@ -742,7 +743,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
_pfc_recent_filter_count->add(filter_count);
}
- _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+ _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
_pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
@@ -821,7 +822,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
resp.count = -1;
}
- _cu_calculator->add_sortkey_count_cu(resp.error, hash_key);
+ _cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
@@ -883,7 +884,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
}
}
- _cu_calculator->add_ttl_cu(resp.error, key);
+ _cu_calculator->add_ttl_cu(rpc.dsn_request(), resp.error, key);
}
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
@@ -893,6 +894,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -905,7 +907,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
rpc.remote_address().to_string(),
request.hash_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
- _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+ _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
@@ -917,7 +919,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
rpc.remote_address().to_string(),
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
- _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+ _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
@@ -975,7 +977,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.stop_inclusive ? "inclusive" : "exclusive");
}
resp.error = rocksdb::Status::kOk;
- _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+ _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
return;
@@ -1115,7 +1117,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
_pfc_recent_filter_count->add(filter_count);
}
- _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+ _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
@@ -1125,6 +1127,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
_pfc_scan_qps->increment();
uint64_t start_time = dsn_now_ns();
const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -1249,7 +1252,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.error = rocksdb::Status::Code::kNotFound;
}
- _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+ _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index cdb7435..ef8f39f 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -42,6 +42,13 @@ public:
return write_cu;
}
+ void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
+ {
+ if (req->is_backup_request()) {
+ backup_request_bytes += bytes;
+ }
+ }
+
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r,
@@ -54,10 +61,12 @@ public:
{
write_cu = 0;
read_cu = 0;
+ backup_request_bytes = 0;
}
int64_t write_cu{0};
int64_t read_cu{0};
+ uint64_t backup_request_bytes{0};
};
static constexpr int MAX_ROCKSDB_STATUS_CODE = 13;
@@ -124,105 +133,117 @@ TEST_F(capacity_unit_calculator_test, init) { test_init(); }
TEST_F(capacity_unit_calculator_test, get)
{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+ msg->header->context.u.is_backup_request = false;
+
// value < 4KB
- _cal->add_get_cu(rocksdb::Status::kOk, key, dsn::blob::create_from_bytes("value"));
+ _cal->add_get_cu(msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes("value"));
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
// value = 4KB
_cal->add_get_cu(
- rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4093, ' ')));
+ msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4093, ' ')));
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
// value > 4KB
_cal->add_get_cu(
- rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
+ msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
ASSERT_EQ(_cal->read_cu, 2);
_cal->reset();
// value > 8KB
- _cal->add_get_cu(
- rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
+ _cal->add_get_cu(msg,
+ rocksdb::Status::kOk,
+ key,
+ dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
ASSERT_EQ(_cal->read_cu, 3);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
- _cal->add_get_cu(rocksdb::Status::kNotFound, key, dsn::blob());
+ _cal->add_get_cu(msg, rocksdb::Status::kNotFound, key, dsn::blob());
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
- _cal->add_get_cu(rocksdb::Status::kCorruption, key, dsn::blob());
+ _cal->add_get_cu(msg, rocksdb::Status::kCorruption, key, dsn::blob());
ASSERT_EQ(_cal->read_cu, 0);
_cal->reset();
}
TEST_F(capacity_unit_calculator_test, multi_get)
{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+ msg->header->context.u.is_backup_request = false;
+
std::vector<::dsn::apps::key_value> kvs;
generate_n_kvs(100, kvs);
- _cal->add_multi_get_cu(rocksdb::Status::kIncomplete, hash_key, kvs);
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kIncomplete, hash_key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
generate_n_kvs(500, kvs);
- _cal->add_multi_get_cu(rocksdb::Status::kOk, hash_key, kvs);
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
ASSERT_GT(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
kvs.clear();
- _cal->add_multi_get_cu(rocksdb::Status::kNotFound, hash_key, kvs);
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kNotFound, hash_key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
- _cal->add_multi_get_cu(rocksdb::Status::kInvalidArgument, hash_key, kvs);
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kInvalidArgument, hash_key, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
- _cal->add_multi_get_cu(rocksdb::Status::kCorruption, hash_key, kvs);
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kCorruption, hash_key, kvs);
ASSERT_EQ(_cal->read_cu, 0);
_cal->reset();
}
TEST_F(capacity_unit_calculator_test, scan)
{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+ msg->header->context.u.is_backup_request = false;
std::vector<::dsn::apps::key_value> kvs;
generate_n_kvs(100, kvs);
- _cal->add_scan_cu(rocksdb::Status::kIncomplete, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kIncomplete, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
generate_n_kvs(500, kvs);
- _cal->add_scan_cu(rocksdb::Status::kIncomplete, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kIncomplete, kvs);
ASSERT_GT(_cal->read_cu, 1);
_cal->reset();
- _cal->add_scan_cu(rocksdb::Status::kOk, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
ASSERT_GT(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 0);
_cal->reset();
kvs.clear();
- _cal->add_scan_cu(rocksdb::Status::kInvalidArgument, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kInvalidArgument, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
- _cal->add_scan_cu(rocksdb::Status::kNotFound, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kNotFound, kvs);
ASSERT_EQ(_cal->read_cu, 1);
_cal->reset();
- _cal->add_scan_cu(rocksdb::Status::kCorruption, kvs);
+ _cal->add_scan_cu(msg, rocksdb::Status::kCorruption, kvs);
ASSERT_EQ(_cal->read_cu, 0);
_cal->reset();
}
TEST_F(capacity_unit_calculator_test, sortkey_count)
{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+ msg->header->context.u.is_backup_request = false;
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
- _cal->add_sortkey_count_cu(i, hash_key);
+ _cal->add_sortkey_count_cu(msg, i, hash_key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
@@ -235,8 +256,10 @@ TEST_F(capacity_unit_calculator_test, sortkey_count)
TEST_F(capacity_unit_calculator_test, ttl)
{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+ msg->header->context.u.is_backup_request = false;
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
- _cal->add_ttl_cu(i, key);
+ _cal->add_ttl_cu(msg, i, key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
@@ -406,5 +429,69 @@ TEST_F(capacity_unit_calculator_test, check_and_mutate)
_cal->reset();
}
+TEST_F(capacity_unit_calculator_test, backup_request_bytes)
+{
+ dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+
+ msg->header->context.u.is_backup_request = false;
+ dsn::blob value = dsn::blob::create_from_bytes("value");
+ _cal->add_get_cu(msg, rocksdb::Status::kOk, key, value);
+ ASSERT_EQ(_cal->backup_request_bytes, 0);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = true;
+ value = dsn::blob::create_from_bytes("value");
+ _cal->add_get_cu(msg, rocksdb::Status::kOk, key, value);
+ ASSERT_EQ(_cal->backup_request_bytes, key.size() + value.size());
+ _cal->reset();
+
+ std::vector<::dsn::apps::key_value> kvs;
+ generate_n_kvs(100, kvs);
+ uint64_t total_size = 0;
+ for (const auto &kv : kvs) {
+ total_size += kv.key.size() + kv.value.size();
+ }
+
+ msg->header->context.u.is_backup_request = false;
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
+ ASSERT_EQ(_cal->backup_request_bytes, 0);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = true;
+ _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
+ ASSERT_EQ(_cal->backup_request_bytes, total_size + hash_key.size());
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = false;
+ _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
+ ASSERT_EQ(_cal->backup_request_bytes, 0);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = true;
+ _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
+ ASSERT_EQ(_cal->backup_request_bytes, total_size);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = false;
+ _cal->add_sortkey_count_cu(msg, rocksdb::Status::kOk, hash_key);
+ ASSERT_EQ(_cal->backup_request_bytes, 0);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = true;
+ _cal->add_sortkey_count_cu(msg, rocksdb::Status::kOk, hash_key);
+ ASSERT_EQ(_cal->backup_request_bytes, 1);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = false;
+ _cal->add_ttl_cu(msg, rocksdb::Status::kOk, key);
+ ASSERT_EQ(_cal->backup_request_bytes, 0);
+ _cal->reset();
+
+ msg->header->context.u.is_backup_request = true;
+ _cal->add_ttl_cu(msg, rocksdb::Status::kOk, key);
+ ASSERT_EQ(_cal->backup_request_bytes, 1);
+ _cal->reset();
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 4d46895..22bdd0b 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -610,6 +610,7 @@ struct row_data
rdb_bf_point_positive_total += row.rdb_bf_point_positive_total;
rdb_bf_point_negatives += row.rdb_bf_point_negatives;
backup_request_qps += row.backup_request_qps;
+ backup_request_bytes += row.backup_request_bytes;
get_bytes += row.get_bytes;
multi_get_bytes += row.multi_get_bytes;
scan_bytes += row.scan_bytes;
@@ -657,6 +658,7 @@ struct row_data
double rdb_bf_point_positive_total = 0;
double rdb_bf_point_negatives = 0;
double backup_request_qps = 0;
+ double backup_request_bytes = 0;
double get_bytes = 0;
double multi_get_bytes = 0;
double scan_bytes = 0;
@@ -739,6 +741,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.rdb_bf_point_negatives += value;
else if (counter_name == "backup_request_qps")
row.backup_request_qps += value;
+ else if (counter_name == "backup_request_bytes")
+ row.backup_request_bytes += value;
else if (counter_name == "get_bytes")
row.get_bytes += value;
else if (counter_name == "multi_get_bytes")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org