You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/06/05 04:34:43 UTC
[incubator-pegasus] 05/32: feat(new_metrics): migrate replica-level metrics for capacity_unit_calculator (#1387)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 578154cd3213366feec118ea830e8070d3072807
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Mar 14 17:18:35 2023 +0800
feat(new_metrics): migrate replica-level metrics for capacity_unit_calculator (#1387)
https://github.com/apache/incubator-pegasus/issues/1334
Migrate the metrics in capacity_unit_calculator to new framework, including read/write capacity units
and the number of bytes consumed by get, multi_get, batch_get, scan, put, multi_put, check_and_set,
check_and_mutate and backup requests.
---
src/server/capacity_unit_calculator.cpp | 142 ++++++++++++++++++--------------
src/server/capacity_unit_calculator.h | 26 +++---
src/server/pegasus_write_service.cpp | 47 +++++------
src/utils/metrics.h | 6 +-
4 files changed, 118 insertions(+), 103 deletions(-)
diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp
index 157577b98..7a62b3d56 100644
--- a/src/server/capacity_unit_calculator.cpp
+++ b/src/server/capacity_unit_calculator.cpp
@@ -35,6 +35,61 @@
#include "utils/fmt_logging.h"
#include "utils/token_bucket_throttling_controller.h"
+METRIC_DEFINE_counter(replica,
+ read_capacity_units,
+ dsn::metric_unit::kCapacityUnits,
+ "The number of capacity units for read requests");
+
+METRIC_DEFINE_counter(replica,
+ write_capacity_units,
+ dsn::metric_unit::kCapacityUnits,
+ "The number of capacity units for write requests");
+
+METRIC_DEFINE_counter(replica,
+ get_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for GET requests");
+
+METRIC_DEFINE_counter(replica,
+ multi_get_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for MULTI_GET requests");
+
+METRIC_DEFINE_counter(replica,
+ batch_get_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for BATCH_GET requests");
+
+METRIC_DEFINE_counter(replica,
+ scan_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for SCAN requests");
+
+METRIC_DEFINE_counter(replica,
+ put_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for PUT requests");
+
+METRIC_DEFINE_counter(replica,
+ multi_put_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for MULTI_PUT requests");
+
+METRIC_DEFINE_counter(replica,
+ check_and_set_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for CHECK_AND_SET requests");
+
+METRIC_DEFINE_counter(replica,
+ check_and_mutate_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for CHECK_AND_MUTATE requests");
+
+METRIC_DEFINE_counter(replica,
+ backup_request_bytes,
+ dsn::metric_unit::kBytes,
+ "The number of bytes for backup requests");
+
namespace pegasus {
namespace server {
@@ -58,6 +113,17 @@ capacity_unit_calculator::capacity_unit_calculator(
std::shared_ptr<hotkey_collector> write_hotkey_collector,
std::shared_ptr<throttling_controller> read_size_throttling_controller)
: replica_base(r),
+ METRIC_VAR_INIT_replica(read_capacity_units),
+ METRIC_VAR_INIT_replica(write_capacity_units),
+ METRIC_VAR_INIT_replica(get_bytes),
+ METRIC_VAR_INIT_replica(multi_get_bytes),
+ METRIC_VAR_INIT_replica(batch_get_bytes),
+ METRIC_VAR_INIT_replica(scan_bytes),
+ METRIC_VAR_INIT_replica(put_bytes),
+ METRIC_VAR_INIT_replica(multi_put_bytes),
+ METRIC_VAR_INIT_replica(check_and_set_bytes),
+ METRIC_VAR_INIT_replica(check_and_mutate_bytes),
+ METRIC_VAR_INIT_replica(backup_request_bytes),
_read_hotkey_collector(read_hotkey_collector),
_write_hotkey_collector(write_hotkey_collector),
_read_size_throttling_controller(read_size_throttling_controller)
@@ -68,55 +134,6 @@ capacity_unit_calculator::capacity_unit_calculator(
_log_read_cu_size = log(FLAGS_perf_counter_read_capacity_unit_size) / log(2);
_log_write_cu_size = log(FLAGS_perf_counter_write_capacity_unit_size) / log(2);
-
- std::string str_gpid = r->get_gpid().to_string();
- char name[256];
- snprintf(name, 255, "recent.read.cu@%s", str_gpid.c_str());
- _pfc_recent_read_cu.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent read capacity units");
- snprintf(name, 255, "recent.write.cu@%s", str_gpid.c_str());
- _pfc_recent_write_cu.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent write capacity units");
-
- snprintf(name, 255, "get_bytes@%s", str_gpid.c_str());
- _pfc_get_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the get bytes");
-
- snprintf(name, 255, "multi_get_bytes@%s", str_gpid.c_str());
- _pfc_multi_get_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");
-
- snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str());
- _pfc_batch_get_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes");
-
- snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
- _pfc_scan_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
-
- snprintf(name, 255, "put_bytes@%s", str_gpid.c_str());
- _pfc_put_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the put bytes");
-
- snprintf(name, 255, "multi_put_bytes@%s", str_gpid.c_str());
- _pfc_multi_put_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi put bytes");
-
- snprintf(name, 255, "check_and_set_bytes@%s", str_gpid.c_str());
- _pfc_check_and_set_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and set bytes");
-
- snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str());
- _pfc_check_and_mutate_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes");
-
- snprintf(name, 255, "backup_request_bytes@%s", str_gpid.c_str());
- _pfc_backup_request_bytes.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the backup request bytes");
}
int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
@@ -125,7 +142,7 @@ int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size)
read_data_size > 0
? (read_data_size + FLAGS_perf_counter_read_capacity_unit_size - 1) >> _log_read_cu_size
: 1;
- _pfc_recent_read_cu->add(read_cu);
+ METRIC_VAR_INCREMENT_BY(read_capacity_units, read_cu);
_read_size_throttling_controller->consume_token(read_data_size);
return read_cu;
}
@@ -136,7 +153,7 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size)
? (write_data_size + FLAGS_perf_counter_write_capacity_unit_size - 1) >>
_log_write_cu_size
: 1;
- _pfc_recent_write_cu->add(write_cu);
+ METRIC_VAR_INCREMENT_BY(write_capacity_units, write_cu);
return write_cu;
}
@@ -146,7 +163,7 @@ void capacity_unit_calculator::add_get_cu(dsn::message_ex *req,
const dsn::blob &value)
{
auto total_size = key.size() + value.size();
- _pfc_get_bytes->add(total_size);
+ METRIC_VAR_INCREMENT_BY(get_bytes, total_size);
add_backup_request_bytes(req, total_size);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
@@ -173,7 +190,7 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
auto total_size = hash_key.size() + multi_get_bytes;
- _pfc_multi_get_bytes->add(total_size);
+ METRIC_VAR_INCREMENT_BY(multi_get_bytes, total_size);
add_backup_request_bytes(req, total_size);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -201,7 +218,7 @@ void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
_read_hotkey_collector->capture_hash_key(data.hash_key, 1);
}
- _pfc_batch_get_bytes->add(data_size);
+ METRIC_VAR_INCREMENT_BY(batch_get_bytes, data_size);
add_backup_request_bytes(req, data_size);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
@@ -237,7 +254,7 @@ void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
data_size += kv.key.size() + kv.value.size();
}
add_read_cu(data_size);
- _pfc_scan_bytes->add(data_size);
+ METRIC_VAR_INCREMENT_BY(scan_bytes, data_size);
add_backup_request_bytes(req, data_size);
}
@@ -269,7 +286,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
const dsn::blob &key,
const dsn::blob &value)
{
- _pfc_put_bytes->add(key.size() + value.size());
+ METRIC_VAR_INCREMENT_BY(put_bytes, key.size() + value.size());
if (status != rocksdb::Status::kOk) {
return;
}
@@ -296,7 +313,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
multi_put_bytes += kv.key.size() + kv.value.size();
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
- _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
+ METRIC_VAR_INCREMENT_BY(multi_put_bytes, hash_key.size() + multi_put_bytes);
uint64_t key_count = kvs.size();
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
@@ -343,8 +360,9 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
const dsn::blob &value)
{
- _pfc_check_and_set_bytes->add(hash_key.size() + check_sort_key.size() + set_sort_key.size() +
- value.size());
+ METRIC_VAR_INCREMENT_BY(check_and_set_bytes,
+ hash_key.size() + check_sort_key.size() + set_sort_key.size() +
+ value.size());
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
return;
@@ -370,8 +388,8 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
check_and_mutate_bytes += m.sort_key.size() + m.value.size();
data_size += hash_key.size() + m.sort_key.size() + m.value.size();
}
- _pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() +
- check_and_mutate_bytes);
+ METRIC_VAR_INCREMENT_BY(check_and_mutate_bytes,
+ hash_key.size() + check_sort_key.size() + check_and_mutate_bytes);
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument &&
status != rocksdb::Status::kTryAgain) {
@@ -389,7 +407,7 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
void capacity_unit_calculator::add_backup_request_bytes(dsn::message_ex *req, int64_t bytes)
{
if (req->is_backup_request()) {
- _pfc_backup_request_bytes->add(bytes);
+ METRIC_VAR_INCREMENT_BY(backup_request_bytes, bytes);
}
}
diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h
index 0b91ea4d2..6d30a07ef 100644
--- a/src/server/capacity_unit_calculator.h
+++ b/src/server/capacity_unit_calculator.h
@@ -108,18 +108,20 @@ private:
uint32_t _log_read_cu_size;
uint32_t _log_write_cu_size;
- ::dsn::perf_counter_wrapper _pfc_recent_read_cu;
- ::dsn::perf_counter_wrapper _pfc_recent_write_cu;
-
- ::dsn::perf_counter_wrapper _pfc_get_bytes;
- ::dsn::perf_counter_wrapper _pfc_multi_get_bytes;
- ::dsn::perf_counter_wrapper _pfc_batch_get_bytes;
- ::dsn::perf_counter_wrapper _pfc_scan_bytes;
- ::dsn::perf_counter_wrapper _pfc_put_bytes;
- ::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
- ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
- ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;
- ::dsn::perf_counter_wrapper _pfc_backup_request_bytes;
+ METRIC_VAR_DECLARE_counter(read_capacity_units);
+ METRIC_VAR_DECLARE_counter(write_capacity_units);
+
+ METRIC_VAR_DECLARE_counter(get_bytes);
+ METRIC_VAR_DECLARE_counter(multi_get_bytes);
+ METRIC_VAR_DECLARE_counter(batch_get_bytes);
+ METRIC_VAR_DECLARE_counter(scan_bytes);
+
+ METRIC_VAR_DECLARE_counter(put_bytes);
+ METRIC_VAR_DECLARE_counter(multi_put_bytes);
+ METRIC_VAR_DECLARE_counter(check_and_set_bytes);
+ METRIC_VAR_DECLARE_counter(check_and_mutate_bytes);
+
+ METRIC_VAR_DECLARE_counter(backup_request_bytes);
/*
hotkey capturing weight rules:
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 4889329d9..969e7d141 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -55,83 +55,82 @@ class message_ex;
METRIC_DEFINE_counter(replica,
put_requests,
dsn::metric_unit::kRequests,
- "The number of PUT requests for each replica");
+ "The number of PUT requests");
METRIC_DEFINE_counter(replica,
multi_put_requests,
dsn::metric_unit::kRequests,
- "The number of MULTI_PUT requests for each replica");
+ "The number of MULTI_PUT requests");
METRIC_DEFINE_counter(replica,
remove_requests,
dsn::metric_unit::kRequests,
- "The number of REMOVE requests for each replica");
+ "The number of REMOVE requests");
METRIC_DEFINE_counter(replica,
multi_remove_requests,
dsn::metric_unit::kRequests,
- "The number of MULTI_REMOVE requests for each replica");
+ "The number of MULTI_REMOVE requests");
METRIC_DEFINE_counter(replica,
incr_requests,
dsn::metric_unit::kRequests,
- "The number of INCR requests for each replica");
+ "The number of INCR requests");
METRIC_DEFINE_counter(replica,
check_and_set_requests,
dsn::metric_unit::kRequests,
- "The number of CHECK_AND_SET requests for each replica");
+ "The number of CHECK_AND_SET requests");
METRIC_DEFINE_counter(replica,
check_and_mutate_requests,
dsn::metric_unit::kRequests,
- "The number of CHECK_AND_MUTATE requests for each replica");
+ "The number of CHECK_AND_MUTATE requests");
METRIC_DEFINE_percentile_int64(replica,
put_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of PUT requests for each replica");
+ "The latency of PUT requests");
METRIC_DEFINE_percentile_int64(replica,
multi_put_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of MULTI_PUT requests for each replica");
+ "The latency of MULTI_PUT requests");
METRIC_DEFINE_percentile_int64(replica,
remove_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of REMOVE requests for each replica");
+ "The latency of REMOVE requests");
METRIC_DEFINE_percentile_int64(replica,
multi_remove_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of MULTI_REMOVE requests for each replica");
+ "The latency of MULTI_REMOVE requests");
METRIC_DEFINE_percentile_int64(replica,
incr_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of INCR requests for each replica");
+ "The latency of INCR requests");
METRIC_DEFINE_percentile_int64(replica,
check_and_set_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of CHECK_AND_SET requests for each replica");
+ "The latency of CHECK_AND_SET requests");
METRIC_DEFINE_percentile_int64(replica,
check_and_mutate_latency_ns,
dsn::metric_unit::kNanoSeconds,
- "The latency of CHECK_AND_MUTATE requests for each replica");
+ "The latency of CHECK_AND_MUTATE requests");
METRIC_DEFINE_counter(replica,
dup_requests,
dsn::metric_unit::kRequests,
- "The number of DUPLICATE requests for each replica");
+ "The number of DUPLICATE requests");
-METRIC_DEFINE_percentile_int64(
- replica,
- dup_time_lag_ms,
- dsn::metric_unit::kMilliSeconds,
- "the time lag (in ms) between master and slave in the duplication for each replica");
+METRIC_DEFINE_percentile_int64(replica,
+ dup_time_lag_ms,
+ dsn::metric_unit::kMilliSeconds,
+ "the time lag (in ms) between master and slave in the duplication");
METRIC_DEFINE_counter(
replica,
@@ -176,12 +175,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_put_batch_size(0),
_remove_batch_size(0)
{
- _dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
- "pegasus.server",
- "dup_lagging_write_threshold_ms",
- 10 * 1000,
- "If the duration that a write flows from master to slave is larger than this threshold, "
- "the write is defined a lagging write.");
}
pegasus_write_service::~pegasus_write_service() {}
@@ -372,7 +365,7 @@ int pegasus_write_service::duplicate(int64_t decree,
METRIC_VAR_INCREMENT(dup_requests);
METRIC_VAR_AUTO_LATENCY(
dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) {
- if (latency > _dup_lagging_write_threshold_ms) {
+ if (latency > FLAGS_dup_lagging_write_threshold_ms) {
METRIC_VAR_INCREMENT(dup_lagging_writes);
}
});
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index facc83f26..333534703 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -168,8 +168,9 @@ class error_code;
// Perform increment-related operations on metrics including gauge and counter.
#define METRIC_VAR_INCREMENT_BY(name, x) \
do { \
- if (x != 0) { \
- _##name->increment_by(x); \
+ const auto v = (x); \
+ if (v != 0) { \
+ _##name->increment_by(v); \
} \
} while (0)
@@ -611,6 +612,7 @@ enum class metric_unit : size_t
kSeconds,
kBytes,
kMegaBytes,
+ kCapacityUnits,
kRequests,
kSeeks,
kPointLookups,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org