You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/11 03:35:15 UTC

[incubator-pegasus] 05/24: feat(new_metrics): migrate replica-level metrics for capacity_unit_calculator (#1387)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit f1a0a749e00ae1ee845c85da3b1ff539d02e8c4e
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Mar 14 17:18:35 2023 +0800

    feat(new_metrics): migrate replica-level metrics for capacity_unit_calculator (#1387)
    
    https://github.com/apache/incubator-pegasus/issues/1334
    
    Migrate the metrics in capacity_unit_calculator to new framework, including read/write capacity units
    and the number of bytes consumed by get, multi_get, batch_get, scan, put, multi_put, check_and_set,
    check_and_mutate and backup requests.
---
 src/server/capacity_unit_calculator.cpp | 142 ++++++++++++++++++--------------
 src/server/capacity_unit_calculator.h   |  26 +++---
 src/server/pegasus_write_service.cpp    |  47 +++++------
 src/utils/metrics.h                     |   6 +-
 4 files changed, 118 insertions(+), 103 deletions(-)

diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 157577b98..7a62b3d56 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -35,6 +35,61 @@
 #include "utils/fmt_logging.h"
 #include "utils/token_bucket_throttling_controller.h"
 
+METRIC_DEFINE_counter(replica,
+                      read_capacity_units,
+                      dsn::metric_unit::kCapacityUnits,
+                      "The number of capacity units for read requests");
+
+METRIC_DEFINE_counter(replica,
+                      write_capacity_units,
+                      dsn::metric_unit::kCapacityUnits,
+                      "The number of capacity units for write requests");
+
+METRIC_DEFINE_counter(replica,
+                      get_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for GET requests");
+
+METRIC_DEFINE_counter(replica,
+                      multi_get_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for MULTI_GET requests");
+
+METRIC_DEFINE_counter(replica,
+                      batch_get_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for BATCH_GET requests");
+
+METRIC_DEFINE_counter(replica,
+                      scan_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for SCAN requests");
+
+METRIC_DEFINE_counter(replica,
+                      put_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for PUT requests");
+
+METRIC_DEFINE_counter(replica,
+                      multi_put_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for MULTI_PUT requests");
+
+METRIC_DEFINE_counter(replica,
+                      check_and_set_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for CHECK_AND_SET requests");
+
+METRIC_DEFINE_counter(replica,
+                      check_and_mutate_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for CHECK_AND_MUTATE requests");
+
+METRIC_DEFINE_counter(replica,
+                      backup_request_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The number of bytes for backup requests");
+
 namespace pegasus {
 namespace server {
 
@@ -58,6 +113,17 @@ capacity_unit_calculator::capacity_unit_calculator(
     std::shared_ptr<hotkey_collector> write_hotkey_collector,
     std::shared_ptr<throttling_controller> read_size_throttling_controller)
     : replica_base(r),
+      METRIC_VAR_INIT_replica(read_capacity_units),
+      METRIC_VAR_INIT_replica(write_capacity_units),
+      METRIC_VAR_INIT_replica(get_bytes),
+      METRIC_VAR_INIT_replica(multi_get_bytes),
+      METRIC_VAR_INIT_replica(batch_get_bytes),
+      METRIC_VAR_INIT_replica(scan_bytes),
+      METRIC_VAR_INIT_replica(put_bytes),
+      METRIC_VAR_INIT_replica(multi_put_bytes),
+      METRIC_VAR_INIT_replica(check_and_set_bytes),
+      METRIC_VAR_INIT_replica(check_and_mutate_bytes),
+      METRIC_VAR_INIT_replica(backup_request_bytes),
       _read_hotkey_collector(read_hotkey_collector),
       _write_hotkey_collector(write_hotkey_collector),
       _read_size_throttling_controller(read_size_throttling_controller)
@@ -68,55 +134,6 @@ capacity_unit_calculator::capacity_unit_calculator(
 
     _log_read_cu_size = log(FLAGS_perf_counter_read_capacity_unit_size) / log(2);
     _log_write_cu_size = log(FLAGS_perf_counter_write_capacity_unit_size) / log(2);
-
-    std::string str_gpid = r->get_gpid().to_string();
-    char name[256];
-    snprintf(name, 255, "recent.read.cu@%s", str_gpid.c_str());
-    _pfc_recent_read_cu.init_app_counter("app.pegasus",
-                                         name,
-                                         COUNTER_TYPE_VOLATILE_NUMBER,
-                                         "statistic the recent read capacity units");
-    snprintf(name, 255, "recent.write.cu@%s", str_gpid.c_str());
-    _pfc_recent_write_cu.init_app_counter("app.pegasus",
-                                          name,
-                                          COUNTER_TYPE_VOLATILE_NUMBER,
-                                          "statistic the recent write capacity units");
-
-    snprintf(name, 255, "get_bytes@%s", str_gpid.c_str());
-    _pfc_get_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the get bytes");
-
-    snprintf(name, 255, "multi_get_bytes@%s", str_gpid.c_str());
-    _pfc_multi_get_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");
-
-    snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str());
-    _pfc_batch_get_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes");
-
-    snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
-    _pfc_scan_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
-
-    snprintf(name, 255, "put_bytes@%s", str_gpid.c_str());
-    _pfc_put_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the put bytes");
-
-    snprintf(name, 255, "multi_put_bytes@%s", str_gpid.c_str());
-    _pfc_multi_put_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi put bytes");
-
-    snprintf(name, 255, "check_and_set_bytes@%s", str_gpid.c_str());
-    _pfc_check_and_set_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and set bytes");
-
-    snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
-    _pfc_check_and_mutate_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
-
-    snprintf(name, 255, "backup_request_bytes@%s", str_gpid.c_str());
-    _pfc_backup_request_bytes.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the backup request bytes");
 }
 
 int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
@@ -125,7 +142,7 @@ int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
         read_data_size > 0
             ? (read_data_size + FLAGS_perf_counter_read_capacity_unit_size - 1) >> _log_read_cu_size
             : 1;
-    _pfc_recent_read_cu->add(read_cu);
+    METRIC_VAR_INCREMENT_BY(read_capacity_units, read_cu);
     _read_size_throttling_controller->consume_token(read_data_size);
     return read_cu;
 }
@@ -136,7 +153,7 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
                            ? (write_data_size + FLAGS_perf_counter_write_capacity_unit_size - 1) >>
                                  _log_write_cu_size
                            : 1;
-    _pfc_recent_write_cu->add(write_cu);
+    METRIC_VAR_INCREMENT_BY(write_capacity_units, write_cu);
     return write_cu;
 }
 
@@ -146,7 +163,7 @@ void capacity_unit_calculator::add_get_cu(dsn::message_ex *req,
                                           const dsn::blob &value)
 {
     auto total_size = key.size() + value.size();
-    _pfc_get_bytes->add(total_size);
+    METRIC_VAR_INCREMENT_BY(get_bytes, total_size);
     add_backup_request_bytes(req, total_size);
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
         return;
@@ -173,7 +190,7 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
         data_size += hash_key.size() + kv.key.size() + kv.value.size();
     }
     auto total_size = hash_key.size() + multi_get_bytes;
-    _pfc_multi_get_bytes->add(total_size);
+    METRIC_VAR_INCREMENT_BY(multi_get_bytes, total_size);
     add_backup_request_bytes(req, total_size);
 
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -201,7 +218,7 @@ void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
         _read_hotkey_collector->capture_hash_key(data.hash_key, 1);
     }
 
-    _pfc_batch_get_bytes->add(data_size);
+    METRIC_VAR_INCREMENT_BY(batch_get_bytes, data_size);
     add_backup_request_bytes(req, data_size);
 
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -237,7 +254,7 @@ void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
         data_size += kv.key.size() + kv.value.size();
     }
     add_read_cu(data_size);
-    _pfc_scan_bytes->add(data_size);
+    METRIC_VAR_INCREMENT_BY(scan_bytes, data_size);
     add_backup_request_bytes(req, data_size);
 }
 
@@ -269,7 +286,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
                                           const dsn::blob &key,
                                           const dsn::blob &value)
 {
-    _pfc_put_bytes->add(key.size() + value.size());
+    METRIC_VAR_INCREMENT_BY(put_bytes, key.size() + value.size());
     if (status != rocksdb::Status::kOk) {
         return;
     }
@@ -296,7 +313,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
         multi_put_bytes += kv.key.size() + kv.value.size();
         data_size += hash_key.size() + kv.key.size() + kv.value.size();
     }
-    _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
+    METRIC_VAR_INCREMENT_BY(multi_put_bytes, hash_key.size() + multi_put_bytes);
     uint64_t key_count = kvs.size();
     _write_hotkey_collector->capture_hash_key(hash_key, key_count);
 
@@ -343,8 +360,9 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
                                                     const dsn::blob &value)
 {
 
-    _pfc_check_and_set_bytes->add(hash_key.size() + check_sort_key.size() + set_sort_key.size() +
-                                  value.size());
+    METRIC_VAR_INCREMENT_BY(check_and_set_bytes,
+                            hash_key.size() + check_sort_key.size() + set_sort_key.size() +
+                                value.size());
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
         status != rocksdb::Status::kTryAgain) {
         return;
@@ -370,8 +388,8 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
         check_and_mutate_bytes += m.sort_key.size() + m.value.size();
         data_size += hash_key.size() + m.sort_key.size() + m.value.size();
     }
-    _pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() +
-                                     check_and_mutate_bytes);
+    METRIC_VAR_INCREMENT_BY(check_and_mutate_bytes,
+                            hash_key.size() + check_sort_key.size() + check_and_mutate_bytes);
 
     if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
         status != rocksdb::Status::kTryAgain) {
@@ -389,7 +407,7 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
 void capacity_unit_calculator::add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
 {
     if (req->is_backup_request()) {
-        _pfc_backup_request_bytes->add(bytes);
+        METRIC_VAR_INCREMENT_BY(backup_request_bytes, bytes);
     }
 }
 
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index 0b91ea4d2..6d30a07ef 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -108,18 +108,20 @@ private:
     uint32_t _log_read_cu_size;
     uint32_t _log_write_cu_size;
 
-    ::dsn::perf_counter_wrapper _pfc_recent_read_cu;
-    ::dsn::perf_counter_wrapper _pfc_recent_write_cu;
-
-    ::dsn::perf_counter_wrapper _pfc_get_bytes;
-    ::dsn::perf_counter_wrapper _pfc_multi_get_bytes;
-    ::dsn::perf_counter_wrapper _pfc_batch_get_bytes;
-    ::dsn::perf_counter_wrapper _pfc_scan_bytes;
-    ::dsn::perf_counter_wrapper _pfc_put_bytes;
-    ::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
-    ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
-    ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
-    ::dsn::perf_counter_wrapper _pfc_backup_request_bytes;
+    METRIC_VAR_DECLARE_counter(read_capacity_units);
+    METRIC_VAR_DECLARE_counter(write_capacity_units);
+
+    METRIC_VAR_DECLARE_counter(get_bytes);
+    METRIC_VAR_DECLARE_counter(multi_get_bytes);
+    METRIC_VAR_DECLARE_counter(batch_get_bytes);
+    METRIC_VAR_DECLARE_counter(scan_bytes);
+
+    METRIC_VAR_DECLARE_counter(put_bytes);
+    METRIC_VAR_DECLARE_counter(multi_put_bytes);
+    METRIC_VAR_DECLARE_counter(check_and_set_bytes);
+    METRIC_VAR_DECLARE_counter(check_and_mutate_bytes);
+
+    METRIC_VAR_DECLARE_counter(backup_request_bytes);
 
     /*
         hotkey capturing weight rules:
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 4889329d9..969e7d141 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -55,83 +55,82 @@ class message_ex;
 METRIC_DEFINE_counter(replica,
                       put_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of PUT requests for each replica");
+                      "The number of PUT requests");
 
 METRIC_DEFINE_counter(replica,
                       multi_put_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of MULTI_PUT requests for each replica");
+                      "The number of MULTI_PUT requests");
 
 METRIC_DEFINE_counter(replica,
                       remove_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of REMOVE requests for each replica");
+                      "The number of REMOVE requests");
 
 METRIC_DEFINE_counter(replica,
                       multi_remove_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of MULTI_REMOVE requests for each replica");
+                      "The number of MULTI_REMOVE requests");
 
 METRIC_DEFINE_counter(replica,
                       incr_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of INCR requests for each replica");
+                      "The number of INCR requests");
 
 METRIC_DEFINE_counter(replica,
                       check_and_set_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of CHECK_AND_SET requests for each replica");
+                      "The number of CHECK_AND_SET requests");
 
 METRIC_DEFINE_counter(replica,
                       check_and_mutate_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of CHECK_AND_MUTATE requests for each replica");
+                      "The number of CHECK_AND_MUTATE requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                put_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of PUT requests for each replica");
+                               "The latency of PUT requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                multi_put_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of MULTI_PUT requests for each replica");
+                               "The latency of MULTI_PUT requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                remove_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of REMOVE requests for each replica");
+                               "The latency of REMOVE requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                multi_remove_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of MULTI_REMOVE requests for each replica");
+                               "The latency of MULTI_REMOVE requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                incr_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of INCR requests for each replica");
+                               "The latency of INCR requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                check_and_set_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of CHECK_AND_SET requests for each replica");
+                               "The latency of CHECK_AND_SET requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                check_and_mutate_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of CHECK_AND_MUTATE requests for each replica");
+                               "The latency of CHECK_AND_MUTATE requests");
 
 METRIC_DEFINE_counter(replica,
                       dup_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of DUPLICATE requests for each replica");
+                      "The number of DUPLICATE requests");
 
-METRIC_DEFINE_percentile_int64(
-    replica,
-    dup_time_lag_ms,
-    dsn::metric_unit::kMilliSeconds,
-    "the time lag (in ms) between master and slave in the duplication for each replica");
+METRIC_DEFINE_percentile_int64(replica,
+                               dup_time_lag_ms,
+                               dsn::metric_unit::kMilliSeconds,
+                               "the time lag (in ms) between master and slave in the duplication");
 
 METRIC_DEFINE_counter(
     replica,
@@ -176,12 +175,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       _put_batch_size(0),
       _remove_batch_size(0)
 {
-    _dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
-        "pegasus.server",
-        "dup_lagging_write_threshold_ms",
-        10 * 1000,
-        "If the duration that a write flows from master to slave is larger than this threshold, "
-        "the write is defined a lagging write.");
 }
 
 pegasus_write_service::~pegasus_write_service() {}
@@ -372,7 +365,7 @@ int pegasus_write_service::duplicate(int64_t decree,
         METRIC_VAR_INCREMENT(dup_requests);
         METRIC_VAR_AUTO_LATENCY(
             dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) {
-                if (latency > _dup_lagging_write_threshold_ms) {
+                if (latency > FLAGS_dup_lagging_write_threshold_ms) {
                     METRIC_VAR_INCREMENT(dup_lagging_writes);
                 }
             });
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index facc83f26..333534703 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -168,8 +168,9 @@ class error_code;
 // Perform increment-related operations on metrics including gauge and counter.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
     do {                                                                                           \
-        if (x != 0) {                                                                              \
-            _##name->increment_by(x);                                                              \
+        const auto v = (x);                                                                        \
+        if (v != 0) {                                                                              \
+            _##name->increment_by(v);                                                              \
         }                                                                                          \
     } while (0)
 
@@ -611,6 +612,7 @@ enum class metric_unit : size_t
     kSeconds,
     kBytes,
     kMegaBytes,
+    kCapacityUnits,
     kRequests,
     kSeeks,
     kPointLookups,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org