You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2021/06/02 07:29:45 UTC

[incubator-pegasus] branch master updated: feat: add perf counter for backup request size (#742)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad4d17  feat: add perf counter for backup request size (#742)
9ad4d17 is described below

commit 9ad4d1797c98c7ad82ebfd603d98330a838a07fe
Author: zhao liwei <zl...@163.com>
AuthorDate: Wed Jun 2 15:29:38 2021 +0800

    feat: add perf counter for backup request size (#742)
---
 src/server/capacity_unit_calculator.cpp           |  39 +++++--
 src/server/capacity_unit_calculator.h             |  17 ++-
 src/server/info_collector.cpp                     |   1 +
 src/server/info_collector.h                       |   2 +
 src/server/pegasus_server_impl.cpp                |  25 +++--
 src/server/test/capacity_unit_calculator_test.cpp | 127 ++++++++++++++++++----
 src/shell/command_helper.h                        |   4 +
 7 files changed, 172 insertions(+), 43 deletions(-)

diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index a457a1e..22d5ff0 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -94,6 +94,10 @@ capacity_unit_calculator::capacity_unit_calculator(
     snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
     _pfc_check_and_mutate_bytes.init_app_counter(
         "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
+
+    snprintf(name, 255, "backup_request_bytes@%s", str_gpid.c_str());
+    _pfc_backup_request_bytes.init_app_counter(
+        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the backup request bytes");
 }
 
 int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
@@ -114,11 +118,14 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
     return write_cu;
 }
 
-void capacity_unit_calculator::add_get_cu(int32_t status,
+void capacity_unit_calculator::add_get_cu(dsn::message_ex *req,
+                                          int32_t status,
                                           const dsn::blob &key,
                                           const dsn::blob &value)
 {
-    _pfc_get_bytes->add(key.size() + value.size());
+    auto total_size = key.size() + value.size();
+    _pfc_get_bytes->add(total_size);
+    add_backup_request_bytes(req, total_size);
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
     }
@@ -132,7 +139,8 @@ void capacity_unit_calculator::add_get_cu(int32_t status,
     _read_hotkey_collector->capture_raw_key(key, 1);
 }
 
-void capacity_unit_calculator::add_multi_get_cu(int32_t status,
+void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
+                                                int32_t status,
                                                 const dsn::blob &hash_key,
                                                 const std::vector<::dsn::apps::key_value> &kvs)
 {
@@ -142,7 +150,9 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
         multi_get_bytes += kv.key.size() + kv.value.size();
         data_size += hash_key.size() + kv.key.size() + kv.value.size();
     }
-    _pfc_multi_get_bytes->add(hash_key.size() + multi_get_bytes);
+    auto total_size = hash_key.size() + multi_get_bytes;
+    _pfc_multi_get_bytes->add(total_size);
+    add_backup_request_bytes(req, total_size);
 
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
         status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
@@ -159,7 +169,8 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
     _read_hotkey_collector->capture_hash_key(hash_key, key_count);
 }
 
-void capacity_unit_calculator::add_scan_cu(int32_t status,
+void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
+                                           int32_t status,
                                            const std::vector<::dsn::apps::key_value> &kvs)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -179,23 +190,30 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
     }
     add_read_cu(data_size);
     _pfc_scan_bytes->add(data_size);
+    add_backup_request_bytes(req, data_size);
 }
 
-void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
+void capacity_unit_calculator::add_sortkey_count_cu(dsn::message_ex *req,
+                                                    int32_t status,
+                                                    const dsn::blob &hash_key)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
     }
     add_read_cu(1);
+    add_backup_request_bytes(req, 1);
     _read_hotkey_collector->capture_hash_key(hash_key, 1);
 }
 
-void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
+void capacity_unit_calculator::add_ttl_cu(dsn::message_ex *req,
+                                          int32_t status,
+                                          const dsn::blob &key)
 {
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
     }
     add_read_cu(1);
+    add_backup_request_bytes(req, 1);
     _read_hotkey_collector->capture_raw_key(key, 1);
 }
 
@@ -320,5 +338,12 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
     _read_hotkey_collector->capture_hash_key(hash_key, 1);
 }
 
+void capacity_unit_calculator::add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
+{
+    if (req->is_backup_request()) {
+        _pfc_backup_request_bytes->add(bytes);
+    }
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index f86b0a4..ab0f662 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -37,13 +37,17 @@ public:
 
     virtual ~capacity_unit_calculator() = default;
 
-    void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
-    void add_multi_get_cu(int32_t status,
+    void
+    add_get_cu(dsn::message_ex *req, int32_t status, const dsn::blob &key, const dsn::blob &value);
+    void add_multi_get_cu(dsn::message_ex *req,
+                          int32_t status,
                           const dsn::blob &hash_key,
                           const std::vector<::dsn::apps::key_value> &kvs);
-    void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
-    void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key);
-    void add_ttl_cu(int32_t status, const dsn::blob &key);
+    void add_scan_cu(dsn::message_ex *req,
+                     int32_t status,
+                     const std::vector<::dsn::apps::key_value> &kvs);
+    void add_sortkey_count_cu(dsn::message_ex *req, int32_t status, const dsn::blob &hash_key);
+    void add_ttl_cu(dsn::message_ex *req, int32_t status, const dsn::blob &key);
 
     void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
     void add_remove_cu(int32_t status, const dsn::blob &key);
@@ -70,9 +74,11 @@ protected:
 #ifdef PEGASUS_UNIT_TEST
     virtual int64_t add_read_cu(int64_t read_data_size);
     virtual int64_t add_write_cu(int64_t write_data_size);
+    virtual void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes);
 #else
     int64_t add_read_cu(int64_t read_data_size);
     int64_t add_write_cu(int64_t write_data_size);
+    void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes);
 #endif
 
 private:
@@ -91,6 +97,7 @@ private:
     ::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
     ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
     ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
+    ::dsn::perf_counter_wrapper _pfc_backup_request_bytes;
 
     /*
         hotkey capturing weight rules:
diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp
index 42be060..3cfd29f 100644
--- a/src/server/info_collector.cpp
+++ b/src/server/info_collector.cpp
@@ -227,6 +227,7 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
     INIT_COUNTER(read_qps);
     INIT_COUNTER(write_qps);
     INIT_COUNTER(backup_request_qps);
+    INIT_COUNTER(backup_request_bytes);
     INIT_COUNTER(get_bytes);
     INIT_COUNTER(multi_get_bytes);
     INIT_COUNTER(scan_bytes);
diff --git a/src/server/info_collector.h b/src/server/info_collector.h
index 5f09aa4..4c54dbe 100644
--- a/src/server/info_collector.h
+++ b/src/server/info_collector.h
@@ -95,6 +95,7 @@ public:
             read_qps->set(row_stats.get_total_read_qps());
             write_qps->set(row_stats.get_total_write_qps());
             backup_request_qps->set(row_stats.backup_request_qps);
+            backup_request_bytes->set(row_stats.backup_request_bytes);
             get_bytes->set(row_stats.get_bytes);
             multi_get_bytes->set(row_stats.multi_get_bytes);
             scan_bytes->set(row_stats.scan_bytes);
@@ -141,6 +142,7 @@ public:
         ::dsn::perf_counter_wrapper read_qps;
         ::dsn::perf_counter_wrapper write_qps;
         ::dsn::perf_counter_wrapper backup_request_qps;
+        ::dsn::perf_counter_wrapper backup_request_bytes;
 
         ::dsn::perf_counter_wrapper get_bytes;
         ::dsn::perf_counter_wrapper multi_get_bytes;
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 890d7bb..ad32583 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -335,7 +335,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
         pegasus_extract_user_data(_pegasus_data_version, std::move(value), resp.value);
     }
 
-    _cu_calculator->add_get_cu(resp.error, key, resp.value);
+    _cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
     _pfc_get_latency->set(dsn_now_ns() - start_time);
 }
 
@@ -346,6 +346,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     uint64_t start_time = dsn_now_ns();
 
     const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -358,7 +359,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                rpc.remote_address().to_string(),
                request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
-        _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+        _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
         _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
         return;
     }
@@ -443,7 +444,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                       stop_inclusive ? "inclusive" : "exclusive");
             }
             resp.error = rocksdb::Status::kOk;
-            _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+            _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
             _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 
             return;
@@ -755,7 +756,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         _pfc_recent_filter_count->add(filter_count);
     }
 
-    _cu_calculator->add_multi_get_cu(resp.error, request.hash_key, resp.kvs);
+    _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
     _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
@@ -834,7 +835,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
         resp.count = -1;
     }
 
-    _cu_calculator->add_sortkey_count_cu(resp.error, hash_key);
+    _cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
     _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
@@ -896,7 +897,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
         }
     }
 
-    _cu_calculator->add_ttl_cu(resp.error, key);
+    _cu_calculator->add_ttl_cu(rpc.dsn_request(), resp.error, key);
 }
 
 void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
@@ -906,6 +907,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
     uint64_t start_time = dsn_now_ns();
 
     const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -918,7 +920,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
                rpc.remote_address().to_string(),
                request.hash_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
-        _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
         _pfc_scan_latency->set(dsn_now_ns() - start_time);
 
         return;
@@ -930,7 +932,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
                rpc.remote_address().to_string(),
                request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
-        _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
         _pfc_scan_latency->set(dsn_now_ns() - start_time);
 
         return;
@@ -988,7 +990,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
                   request.stop_inclusive ? "inclusive" : "exclusive");
         }
         resp.error = rocksdb::Status::kOk;
-        _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+        _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
         _pfc_scan_latency->set(dsn_now_ns() - start_time);
 
         return;
@@ -1137,7 +1139,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         _pfc_recent_filter_count->add(filter_count);
     }
 
-    _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+    _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
     _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
@@ -1147,6 +1149,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
     _pfc_scan_qps->increment();
     uint64_t start_time = dsn_now_ns();
     const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -1279,7 +1282,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         resp.error = rocksdb::Status::Code::kNotFound;
     }
 
-    _cu_calculator->add_scan_cu(resp.error, resp.kvs);
+    _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
     _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp
index cdb7435..ef8f39f 100644
--- a/src/server/test/capacity_unit_calculator_test.cpp
+++ b/src/server/test/capacity_unit_calculator_test.cpp
@@ -42,6 +42,13 @@ public:
         return write_cu;
     }
 
+    void add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
+    {
+        if (req->is_backup_request()) {
+            backup_request_bytes += bytes;
+        }
+    }
+
     explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
         : capacity_unit_calculator(
               r,
@@ -54,10 +61,12 @@ public:
     {
         write_cu = 0;
         read_cu = 0;
+        backup_request_bytes = 0;
     }
 
     int64_t write_cu{0};
     int64_t read_cu{0};
+    uint64_t backup_request_bytes{0};
 };
 
 static constexpr int MAX_ROCKSDB_STATUS_CODE = 13;
@@ -124,105 +133,117 @@ TEST_F(capacity_unit_calculator_test, init) { test_init(); }
 
 TEST_F(capacity_unit_calculator_test, get)
 {
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+    msg->header->context.u.is_backup_request = false;
+
     // value < 4KB
-    _cal->add_get_cu(rocksdb::Status::kOk, key, dsn::blob::create_from_bytes("value"));
+    _cal->add_get_cu(msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes("value"));
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
     // value = 4KB
     _cal->add_get_cu(
-        rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4093, ' ')));
+        msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4093, ' ')));
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
     // value > 4KB
     _cal->add_get_cu(
-        rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
+        msg, rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4097, ' ')));
     ASSERT_EQ(_cal->read_cu, 2);
     _cal->reset();
 
     // value > 8KB
-    _cal->add_get_cu(
-        rocksdb::Status::kOk, key, dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
+    _cal->add_get_cu(msg,
+                     rocksdb::Status::kOk,
+                     key,
+                     dsn::blob::create_from_bytes(std::string(4096 * 2 + 1, ' ')));
     ASSERT_EQ(_cal->read_cu, 3);
     ASSERT_EQ(_cal->write_cu, 0);
     _cal->reset();
 
-    _cal->add_get_cu(rocksdb::Status::kNotFound, key, dsn::blob());
+    _cal->add_get_cu(msg, rocksdb::Status::kNotFound, key, dsn::blob());
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_get_cu(rocksdb::Status::kCorruption, key, dsn::blob());
+    _cal->add_get_cu(msg, rocksdb::Status::kCorruption, key, dsn::blob());
     ASSERT_EQ(_cal->read_cu, 0);
     _cal->reset();
 }
 
 TEST_F(capacity_unit_calculator_test, multi_get)
 {
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+    msg->header->context.u.is_backup_request = false;
+
     std::vector<::dsn::apps::key_value> kvs;
 
     generate_n_kvs(100, kvs);
-    _cal->add_multi_get_cu(rocksdb::Status::kIncomplete, hash_key, kvs);
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kIncomplete, hash_key, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
     generate_n_kvs(500, kvs);
-    _cal->add_multi_get_cu(rocksdb::Status::kOk, hash_key, kvs);
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
     ASSERT_GT(_cal->read_cu, 1);
     ASSERT_EQ(_cal->write_cu, 0);
     _cal->reset();
 
     kvs.clear();
-    _cal->add_multi_get_cu(rocksdb::Status::kNotFound, hash_key, kvs);
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kNotFound, hash_key, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_multi_get_cu(rocksdb::Status::kInvalidArgument, hash_key, kvs);
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kInvalidArgument, hash_key, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_multi_get_cu(rocksdb::Status::kCorruption, hash_key, kvs);
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kCorruption, hash_key, kvs);
     ASSERT_EQ(_cal->read_cu, 0);
     _cal->reset();
 }
 
 TEST_F(capacity_unit_calculator_test, scan)
 {
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+    msg->header->context.u.is_backup_request = false;
     std::vector<::dsn::apps::key_value> kvs;
 
     generate_n_kvs(100, kvs);
-    _cal->add_scan_cu(rocksdb::Status::kIncomplete, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kIncomplete, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
     generate_n_kvs(500, kvs);
-    _cal->add_scan_cu(rocksdb::Status::kIncomplete, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kIncomplete, kvs);
     ASSERT_GT(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_scan_cu(rocksdb::Status::kOk, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
     ASSERT_GT(_cal->read_cu, 1);
     ASSERT_EQ(_cal->write_cu, 0);
     _cal->reset();
 
     kvs.clear();
-    _cal->add_scan_cu(rocksdb::Status::kInvalidArgument, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kInvalidArgument, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_scan_cu(rocksdb::Status::kNotFound, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kNotFound, kvs);
     ASSERT_EQ(_cal->read_cu, 1);
     _cal->reset();
 
-    _cal->add_scan_cu(rocksdb::Status::kCorruption, kvs);
+    _cal->add_scan_cu(msg, rocksdb::Status::kCorruption, kvs);
     ASSERT_EQ(_cal->read_cu, 0);
     _cal->reset();
 }
 
 TEST_F(capacity_unit_calculator_test, sortkey_count)
 {
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+    msg->header->context.u.is_backup_request = false;
     for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
-        _cal->add_sortkey_count_cu(i, hash_key);
+        _cal->add_sortkey_count_cu(msg, i, hash_key);
         if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
             ASSERT_EQ(_cal->read_cu, 1);
         } else {
@@ -235,8 +256,10 @@ TEST_F(capacity_unit_calculator_test, sortkey_count)
 
 TEST_F(capacity_unit_calculator_test, ttl)
 {
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+    msg->header->context.u.is_backup_request = false;
     for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
-        _cal->add_ttl_cu(i, key);
+        _cal->add_ttl_cu(msg, i, key);
         if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
             ASSERT_EQ(_cal->read_cu, 1);
         } else {
@@ -406,5 +429,69 @@ TEST_F(capacity_unit_calculator_test, check_and_mutate)
     _cal->reset();
 }
 
+TEST_F(capacity_unit_calculator_test, backup_request_bytes)
+{
+    dsn::message_ptr msg = dsn::message_ex::create_request(RPC_TEST, static_cast<int>(1000), 1, 1);
+
+    msg->header->context.u.is_backup_request = false;
+    dsn::blob value = dsn::blob::create_from_bytes("value");
+    _cal->add_get_cu(msg, rocksdb::Status::kOk, key, value);
+    ASSERT_EQ(_cal->backup_request_bytes, 0);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = true;
+    value = dsn::blob::create_from_bytes("value");
+    _cal->add_get_cu(msg, rocksdb::Status::kOk, key, value);
+    ASSERT_EQ(_cal->backup_request_bytes, key.size() + value.size());
+    _cal->reset();
+
+    std::vector<::dsn::apps::key_value> kvs;
+    generate_n_kvs(100, kvs);
+    uint64_t total_size = 0;
+    for (const auto &kv : kvs) {
+        total_size += kv.key.size() + kv.value.size();
+    }
+
+    msg->header->context.u.is_backup_request = false;
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
+    ASSERT_EQ(_cal->backup_request_bytes, 0);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = true;
+    _cal->add_multi_get_cu(msg, rocksdb::Status::kOk, hash_key, kvs);
+    ASSERT_EQ(_cal->backup_request_bytes, total_size + hash_key.size());
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = false;
+    _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
+    ASSERT_EQ(_cal->backup_request_bytes, 0);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = true;
+    _cal->add_scan_cu(msg, rocksdb::Status::kOk, kvs);
+    ASSERT_EQ(_cal->backup_request_bytes, total_size);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = false;
+    _cal->add_sortkey_count_cu(msg, rocksdb::Status::kOk, hash_key);
+    ASSERT_EQ(_cal->backup_request_bytes, 0);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = true;
+    _cal->add_sortkey_count_cu(msg, rocksdb::Status::kOk, hash_key);
+    ASSERT_EQ(_cal->backup_request_bytes, 1);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = false;
+    _cal->add_ttl_cu(msg, rocksdb::Status::kOk, key);
+    ASSERT_EQ(_cal->backup_request_bytes, 0);
+    _cal->reset();
+
+    msg->header->context.u.is_backup_request = true;
+    _cal->add_ttl_cu(msg, rocksdb::Status::kOk, key);
+    ASSERT_EQ(_cal->backup_request_bytes, 1);
+    _cal->reset();
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 9c345f5..be2f3ca 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -610,6 +610,7 @@ struct row_data
         rdb_bf_point_positive_total += row.rdb_bf_point_positive_total;
         rdb_bf_point_negatives += row.rdb_bf_point_negatives;
         backup_request_qps += row.backup_request_qps;
+        backup_request_bytes += row.backup_request_bytes;
         get_bytes += row.get_bytes;
         multi_get_bytes += row.multi_get_bytes;
         scan_bytes += row.scan_bytes;
@@ -657,6 +658,7 @@ struct row_data
     double rdb_bf_point_positive_total = 0;
     double rdb_bf_point_negatives = 0;
     double backup_request_qps = 0;
+    double backup_request_bytes = 0;
     double get_bytes = 0;
     double multi_get_bytes = 0;
     double scan_bytes = 0;
@@ -739,6 +741,8 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
         row.rdb_bf_point_negatives += value;
     else if (counter_name == "backup_request_qps")
         row.backup_request_qps += value;
+    else if (counter_name == "backup_request_bytes")
+        row.backup_request_bytes += value;
     else if (counter_name == "get_bytes")
         row.get_bytes += value;
     else if (counter_name == "multi_get_bytes")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org