You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/25 10:55:01 UTC
[incubator-pegasus] 02/28: feat(new_metrics): migrate replica-level metrics for write service (#1351)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit fa767dff150eecbc28d9a5c69945e248652fb28c
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu Feb 23 23:18:20 2023 +0800
feat(new_metrics): migrate replica-level metrics for write service (#1351)
---
.github/workflows/lint_and_test_cpp.yaml | 12 +-
run.sh | 6 +-
src/replica/replica_base.h | 9 +-
src/server/pegasus_server_write.cpp | 21 +-
src/server/pegasus_server_write.h | 3 +-
src/server/pegasus_write_service.cpp | 284 ++++++++++++++------------
src/server/pegasus_write_service.h | 47 +++--
src/server/test/pegasus_server_write_test.cpp | 4 +-
src/utils/metrics.h | 135 +++++++++++-
src/utils/test/metrics_test.cpp | 147 ++++++++++++-
src/utils/time_utils.h | 28 ++-
11 files changed, 513 insertions(+), 183 deletions(-)
diff --git a/.github/workflows/lint_and_test_cpp.yaml b/.github/workflows/lint_and_test_cpp.yaml
index 8b10be60f..032e632e3 100644
--- a/.github/workflows/lint_and_test_cpp.yaml
+++ b/.github/workflows/lint_and_test_cpp.yaml
@@ -172,7 +172,11 @@ jobs:
- base_api_test
- base_test
- bulk_load_test
- - detect_hotspot_test
+ # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+ # is being replaced with the new metrics system, its test will fail. Temporarily disable
+ # the test and re-enable it after the hotspot detection is migrated to the new metrics
+ # system.
+ # - detect_hotspot_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
@@ -295,7 +299,11 @@ jobs:
- base_api_test
- base_test
- bulk_load_test
- - detect_hotspot_test
+ # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+ # is being replaced with the new metrics system, its test will fail. Temporarily disable
+ # the test and re-enable it after the hotspot detection is migrated to the new metrics
+ # system.
+ # - detect_hotspot_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
diff --git a/run.sh b/run.sh
index cabf53402..0df98b040 100755
--- a/run.sh
+++ b/run.sh
@@ -342,7 +342,11 @@ function run_test()
base_api_test
base_test
bulk_load_test
- detect_hotspot_test
+ # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+ # is being replaced with the new metrics system, its test will fail. Temporarily disable
+ # the test and re-enable it after the hotspot detection is migrated to the new metrics
+ # system.
+ # detect_hotspot_test
dsn_aio_test
dsn_block_service_test
dsn_client_test
diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h
index 88202d055..7c5b7747e 100644
--- a/src/replica/replica_base.h
+++ b/src/replica/replica_base.h
@@ -51,7 +51,14 @@ struct replica_base
const char *log_prefix() const { return _name.c_str(); }
- const metric_entity_ptr &replica_metric_entity() const { return _replica_metric_entity; }
+ const metric_entity_ptr &replica_metric_entity() const
+ {
+ CHECK_NOTNULL(_replica_metric_entity,
+ "replica metric entity should has been instantiated: "
+ "uninitialized entity cannot be used to instantiate "
+ "metric");
+ return _replica_metric_entity;
+ }
private:
const gpid _gpid;
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 0147050a6..e6cb5331d 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -31,7 +31,6 @@
#include "pegasus_server_impl.h"
#include "pegasus_server_write.h"
#include "pegasus_utils.h"
-#include "perf_counter/perf_counter.h"
#include "rrdb/rrdb.code.definition.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
@@ -42,20 +41,20 @@
#include "utils/fmt_logging.h"
#include "utils/ports.h"
+METRIC_DEFINE_counter(replica,
+ corrupt_writes,
+ dsn::metric_unit::kRequests,
+ "The number of corrupt writes for each replica");
+
namespace pegasus {
namespace server {
DSN_DECLARE_bool(rocksdb_verbose_log);
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
- : replica_base(server), _write_svc(new pegasus_write_service(server))
+ : replica_base(server),
+ _write_svc(new pegasus_write_service(server)),
+ METRIC_VAR_INIT_replica(corrupt_writes)
{
- char name[256];
- snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string());
- _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent corrupt write count");
-
init_non_batch_write_handlers();
}
@@ -81,7 +80,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return iter->second(requests[0]);
}
} catch (TTransportException &ex) {
- _pfc_recent_corrupt_write_count->increment();
+ METRIC_VAR_INCREMENT(corrupt_writes);
LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
requests[0]->header->from_address.to_string(),
ex.what());
@@ -125,7 +124,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
}
}
} catch (TTransportException &ex) {
- _pfc_recent_corrupt_write_count->increment();
+ METRIC_VAR_INCREMENT(corrupt_writes);
LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = {}, exception = {}",
requests[i]->header->from_address.to_string(),
ex.what());
diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h
index 6a002c299..d8a358164 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -27,7 +27,6 @@
#include "base/pegasus_rpc_types.h"
#include "pegasus_write_service.h"
-#include "perf_counter/perf_counter_wrapper.h"
#include "replica/replica_base.h"
#include "rrdb/rrdb_types.h"
#include "runtime/task/task_code.h"
@@ -102,7 +101,7 @@ private:
typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
non_batch_writes_map _non_batch_write_handlers;
- ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
+ METRIC_VAR_DECLARE_counter(corrupt_writes);
};
} // namespace server
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 73f6cc8d6..4889329d9 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -52,6 +52,93 @@ class blob;
class message_ex;
} // namespace dsn
+METRIC_DEFINE_counter(replica,
+ put_requests,
+ dsn::metric_unit::kRequests,
+ "The number of PUT requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ multi_put_requests,
+ dsn::metric_unit::kRequests,
+ "The number of MULTI_PUT requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ remove_requests,
+ dsn::metric_unit::kRequests,
+ "The number of REMOVE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ multi_remove_requests,
+ dsn::metric_unit::kRequests,
+ "The number of MULTI_REMOVE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ incr_requests,
+ dsn::metric_unit::kRequests,
+ "The number of INCR requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ check_and_set_requests,
+ dsn::metric_unit::kRequests,
+ "The number of CHECK_AND_SET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ check_and_mutate_requests,
+ dsn::metric_unit::kRequests,
+ "The number of CHECK_AND_MUTATE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ put_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of PUT requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ multi_put_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of MULTI_PUT requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ remove_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of REMOVE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ multi_remove_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of MULTI_REMOVE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ incr_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of INCR requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ check_and_set_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of CHECK_AND_SET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ check_and_mutate_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of CHECK_AND_MUTATE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ dup_requests,
+ dsn::metric_unit::kRequests,
+ "The number of DUPLICATE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(
+ replica,
+ dup_time_lag_ms,
+ dsn::metric_unit::kMilliSeconds,
+ "the time lag (in ms) between master and slave in the duplication for each replica");
+
+METRIC_DEFINE_counter(
+ replica,
+ dup_lagging_writes,
+ dsn::metric_unit::kRequests,
+ "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)");
+
namespace pegasus {
namespace server {
@@ -68,105 +155,33 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_server(server),
_impl(new impl(server)),
_batch_start_time(0),
- _cu_calculator(server->_cu_calculator.get())
+ _cu_calculator(server->_cu_calculator.get()),
+ METRIC_VAR_INIT_replica(put_requests),
+ METRIC_VAR_INIT_replica(multi_put_requests),
+ METRIC_VAR_INIT_replica(remove_requests),
+ METRIC_VAR_INIT_replica(multi_remove_requests),
+ METRIC_VAR_INIT_replica(incr_requests),
+ METRIC_VAR_INIT_replica(check_and_set_requests),
+ METRIC_VAR_INIT_replica(check_and_mutate_requests),
+ METRIC_VAR_INIT_replica(put_latency_ns),
+ METRIC_VAR_INIT_replica(multi_put_latency_ns),
+ METRIC_VAR_INIT_replica(remove_latency_ns),
+ METRIC_VAR_INIT_replica(multi_remove_latency_ns),
+ METRIC_VAR_INIT_replica(incr_latency_ns),
+ METRIC_VAR_INIT_replica(check_and_set_latency_ns),
+ METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
+ METRIC_VAR_INIT_replica(dup_requests),
+ METRIC_VAR_INIT_replica(dup_time_lag_ms),
+ METRIC_VAR_INIT_replica(dup_lagging_writes),
+ _put_batch_size(0),
+ _remove_batch_size(0)
{
- std::string str_gpid = fmt::format("{}", server->get_gpid());
-
- std::string name;
-
- name = fmt::format("put_qps@{}", str_gpid);
- _pfc_put_qps.init_app_counter(
- "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");
-
- name = fmt::format("multi_put_qps@{}", str_gpid);
- _pfc_multi_put_qps.init_app_counter(
- "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");
-
- name = fmt::format("remove_qps@{}", str_gpid);
- _pfc_remove_qps.init_app_counter(
- "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");
-
- name = fmt::format("multi_remove_qps@{}", str_gpid);
- _pfc_multi_remove_qps.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_RATE,
- "statistic the qps of MULTI_REMOVE request");
-
- name = fmt::format("incr_qps@{}", str_gpid);
- _pfc_incr_qps.init_app_counter(
- "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");
-
- name = fmt::format("check_and_set_qps@{}", str_gpid);
- _pfc_check_and_set_qps.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_RATE,
- "statistic the qps of CHECK_AND_SET request");
-
- name = fmt::format("check_and_mutate_qps@{}", str_gpid);
- _pfc_check_and_mutate_qps.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_RATE,
- "statistic the qps of CHECK_AND_MUTATE request");
-
- name = fmt::format("put_latency@{}", str_gpid);
- _pfc_put_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of PUT request");
-
- name = fmt::format("multi_put_latency@{}", str_gpid);
- _pfc_multi_put_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of MULTI_PUT request");
-
- name = fmt::format("remove_latency@{}", str_gpid);
- _pfc_remove_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of REMOVE request");
-
- name = fmt::format("multi_remove_latency@{}", str_gpid);
- _pfc_multi_remove_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of MULTI_REMOVE request");
-
- name = fmt::format("incr_latency@{}", str_gpid);
- _pfc_incr_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of INCR request");
-
- name = fmt::format("check_and_set_latency@{}", str_gpid);
- _pfc_check_and_set_latency.init_app_counter("app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of CHECK_AND_SET request");
-
- name = fmt::format("check_and_mutate_latency@{}", str_gpid);
- _pfc_check_and_mutate_latency.init_app_counter(
- "app.pegasus",
- name.c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of CHECK_AND_MUTATE request");
-
- _pfc_duplicate_qps.init_app_counter("app.pegasus",
- fmt::format("duplicate_qps@{}", str_gpid).c_str(),
- COUNTER_TYPE_RATE,
- "statistic the qps of DUPLICATE requests");
-
- _pfc_dup_time_lag.init_app_counter(
- "app.pegasus",
- fmt::format("dup.time_lag_ms@{}", app_name()).c_str(),
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "the time (in ms) lag between master and slave in the duplication");
-
- _pfc_dup_lagging_writes.init_app_counter(
- "app.pegasus",
- fmt::format("dup.lagging_writes@{}", app_name()).c_str(),
- COUNTER_TYPE_VOLATILE_NUMBER,
- "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)");
+ _dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
+ "pegasus.server",
+ "dup_lagging_write_threshold_ms",
+ 10 * 1000,
+ "If the duration that a write flows from master to slave is larger than this threshold, "
+ "the write is defined a lagging write.");
}
pegasus_write_service::~pegasus_write_service() {}
@@ -177,15 +192,15 @@ int pegasus_write_service::multi_put(const db_write_context &ctx,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
- uint64_t start_time = dsn_now_ns();
- _pfc_multi_put_qps->increment();
+ METRIC_VAR_AUTO_LATENCY(multi_put_latency_ns);
+ METRIC_VAR_INCREMENT(multi_put_requests);
+
int err = _impl->multi_put(ctx, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs);
}
- _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
return err;
}
@@ -193,15 +208,15 @@ int pegasus_write_service::multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
- uint64_t start_time = dsn_now_ns();
- _pfc_multi_remove_qps->increment();
+ METRIC_VAR_AUTO_LATENCY(multi_remove_latency_ns);
+ METRIC_VAR_INCREMENT(multi_remove_requests);
+
int err = _impl->multi_remove(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_multi_remove_cu(resp.error, update.hash_key, update.sort_keys);
}
- _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
return err;
}
@@ -209,15 +224,15 @@ int pegasus_write_service::incr(int64_t decree,
const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
{
- uint64_t start_time = dsn_now_ns();
- _pfc_incr_qps->increment();
+ METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
+ METRIC_VAR_INCREMENT(incr_requests);
+
int err = _impl->incr(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_incr_cu(resp.error, update.key);
}
- _pfc_incr_latency->set(dsn_now_ns() - start_time);
return err;
}
@@ -225,8 +240,9 @@ int pegasus_write_service::check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
{
- uint64_t start_time = dsn_now_ns();
- _pfc_check_and_set_qps->increment();
+ METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
+ METRIC_VAR_INCREMENT(check_and_set_requests);
+
int err = _impl->check_and_set(decree, update, resp);
if (_server->is_primary()) {
@@ -237,7 +253,6 @@ int pegasus_write_service::check_and_set(int64_t decree,
update.set_value);
}
- _pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
return err;
}
@@ -245,8 +260,9 @@ int pegasus_write_service::check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp)
{
- uint64_t start_time = dsn_now_ns();
- _pfc_check_and_mutate_qps->increment();
+ METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns);
+ METRIC_VAR_INCREMENT(check_and_mutate_requests);
+
int err = _impl->check_and_mutate(decree, update, resp);
if (_server->is_primary()) {
@@ -254,7 +270,6 @@ int pegasus_write_service::check_and_mutate(int64_t decree,
resp.error, update.hash_key, update.check_sort_key, update.mutate_list);
}
- _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time);
return err;
}
@@ -272,8 +287,7 @@ int pegasus_write_service::batch_put(const db_write_context &ctx,
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after batch_prepare");
- _batch_qps_perfcounters.push_back(_pfc_put_qps.get());
- _batch_latency_perfcounters.push_back(_pfc_put_latency.get());
+ ++_put_batch_size;
int err = _impl->batch_put(ctx, update, resp);
if (_server->is_primary()) {
@@ -289,8 +303,7 @@ int pegasus_write_service::batch_remove(int64_t decree,
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after batch_prepare");
- _batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
- _batch_latency_perfcounters.push_back(_pfc_remove_latency.get());
+ ++_remove_batch_size;
int err = _impl->batch_remove(decree, key, resp);
if (_server->is_primary()) {
@@ -322,15 +335,21 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_t
void pegasus_write_service::clear_up_batch_states()
{
- uint64_t latency = dsn_now_ns() - _batch_start_time;
- for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
- pfc->increment();
- for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
- pfc->set(latency);
-
- _batch_qps_perfcounters.clear();
- _batch_latency_perfcounters.clear();
+#define PROCESS_WRITE_BATCH(op) \
+ do { \
+ METRIC_VAR_INCREMENT_BY(op##_requests, static_cast<int64_t>(_##op##_batch_size)); \
+ METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(_##op##_batch_size), latency_ns); \
+ _##op##_batch_size = 0; \
+ } while (0)
+
+ auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
+
+ PROCESS_WRITE_BATCH(put);
+ PROCESS_WRITE_BATCH(remove);
+
_batch_start_time = 0;
+
+#undef PROCESS_WRITE_BATCH
}
int pegasus_write_service::duplicate(int64_t decree,
@@ -350,14 +369,13 @@ int pegasus_write_service::duplicate(int64_t decree,
return empty_put(decree);
}
- _pfc_duplicate_qps->increment();
- auto cleanup = dsn::defer([this, &request]() {
- uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
- if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) {
- _pfc_dup_lagging_writes->increment();
- }
- _pfc_dup_time_lag->set(latency_ms);
- });
+ METRIC_VAR_INCREMENT(dup_requests);
+ METRIC_VAR_AUTO_LATENCY(
+ dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) {
+ if (latency > _dup_lagging_write_threshold_ms) {
+ METRIC_VAR_INCREMENT(dup_lagging_writes);
+ }
+ });
dsn::message_ex *write =
dsn::from_blob_to_received_msg(request.task_code, request.raw_message);
bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 9fb854ffd..9e79f9122 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -25,13 +25,11 @@
#include "common//duplication_common.h"
#include "common/common.h"
-#include "perf_counter/perf_counter_wrapper.h"
#include "replica/replica_base.h"
#include "utils/errors.h"
namespace dsn {
class blob;
-class perf_counter;
namespace apps {
class check_and_mutate_request;
class check_and_mutate_response;
@@ -216,28 +214,29 @@ private:
capacity_unit_calculator *_cu_calculator;
- ::dsn::perf_counter_wrapper _pfc_put_qps;
- ::dsn::perf_counter_wrapper _pfc_multi_put_qps;
- ::dsn::perf_counter_wrapper _pfc_remove_qps;
- ::dsn::perf_counter_wrapper _pfc_multi_remove_qps;
- ::dsn::perf_counter_wrapper _pfc_incr_qps;
- ::dsn::perf_counter_wrapper _pfc_check_and_set_qps;
- ::dsn::perf_counter_wrapper _pfc_check_and_mutate_qps;
- ::dsn::perf_counter_wrapper _pfc_duplicate_qps;
- ::dsn::perf_counter_wrapper _pfc_dup_time_lag;
- ::dsn::perf_counter_wrapper _pfc_dup_lagging_writes;
-
- ::dsn::perf_counter_wrapper _pfc_put_latency;
- ::dsn::perf_counter_wrapper _pfc_multi_put_latency;
- ::dsn::perf_counter_wrapper _pfc_remove_latency;
- ::dsn::perf_counter_wrapper _pfc_multi_remove_latency;
- ::dsn::perf_counter_wrapper _pfc_incr_latency;
- ::dsn::perf_counter_wrapper _pfc_check_and_set_latency;
- ::dsn::perf_counter_wrapper _pfc_check_and_mutate_latency;
-
- // Records all requests.
- std::vector<::dsn::perf_counter *> _batch_qps_perfcounters;
- std::vector<::dsn::perf_counter *> _batch_latency_perfcounters;
+ METRIC_VAR_DECLARE_counter(put_requests);
+ METRIC_VAR_DECLARE_counter(multi_put_requests);
+ METRIC_VAR_DECLARE_counter(remove_requests);
+ METRIC_VAR_DECLARE_counter(multi_remove_requests);
+ METRIC_VAR_DECLARE_counter(incr_requests);
+ METRIC_VAR_DECLARE_counter(check_and_set_requests);
+ METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
+
+ METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(multi_remove_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(incr_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(check_and_set_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);
+
+ METRIC_VAR_DECLARE_counter(dup_requests);
+ METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
+ METRIC_VAR_DECLARE_counter(dup_lagging_writes);
+
+ // Record batch size for put and remove requests.
+ uint32_t _put_batch_size;
+ uint32_t _remove_batch_size;
// TODO(wutao1): add perf counters for failed rpc.
};
diff --git a/src/server/test/pegasus_server_write_test.cpp b/src/server/test/pegasus_server_write_test.cpp
index 0e8c57114..b771e67bf 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -108,8 +108,8 @@ public:
// make sure everything is cleanup after batch write.
ASSERT_TRUE(_server_write->_put_rpc_batch.empty());
ASSERT_TRUE(_server_write->_remove_rpc_batch.empty());
- ASSERT_TRUE(_server_write->_write_svc->_batch_qps_perfcounters.empty());
- ASSERT_TRUE(_server_write->_write_svc->_batch_latency_perfcounters.empty());
+ ASSERT_EQ(_server_write->_write_svc->_put_batch_size, 0);
+ ASSERT_EQ(_server_write->_write_svc->_remove_batch_size, 0);
ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(),
0);
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 27c6355f3..2d6da6c0f 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -51,6 +51,7 @@
#include "utils/singleton.h"
#include "utils/string_view.h"
#include "utils/synchronize.h"
+#include "utils/time_utils.h"
namespace boost {
namespace system {
@@ -89,7 +90,8 @@ class error_code;
// Instantiating the metric in whatever class represents it with some initial arguments, if any:
// metric_instance = METRIC_my_gauge_name.instantiate(entity_instance, ...);
-// Convenient macros are provided to define entity types and metric prototypes.
+// The following are convenient macros provided to define entity types and metric prototypes.
+
#define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name)
#define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...) \
::dsn::gauge_prototype<int64_t> METRIC_##name( \
@@ -97,6 +99,7 @@ class error_code;
#define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \
::dsn::gauge_prototype<double> METRIC_##name( \
{#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
+
// There are 2 kinds of counters:
// - `counter` is the general type of counter that is implemented by striped_long_adder, which can
// achieve high performance while consuming less memory if it's not updated very frequently.
@@ -141,6 +144,42 @@ class error_code;
#define METRIC_DECLARE_percentile_double(name) \
extern dsn::floating_percentile_prototype<double> METRIC_##name
+// Following METRIC_*VAR* macros are introduced so that:
+// * only need to use prototype name to operate each metric variable;
+// * uniformly name each variable in user class;
+// * differentiate operations on metrics significantly from main logic, improving code readability.
+
+// Declare a metric variable in user class.
+//
+// Since a type tends to be a class template where there might be commas, use variadic arguments
+// instead of a single fixed argument to represent a type.
+#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ _##name
+#define METRIC_VAR_DECLARE_gauge_int64(name) METRIC_VAR_DECLARE(name, dsn::gauge_ptr<int64_t>)
+#define METRIC_VAR_DECLARE_counter(name) \
+ METRIC_VAR_DECLARE(name, dsn::counter_ptr<dsn::striped_long_adder, false>)
+#define METRIC_VAR_DECLARE_percentile_int64(name) \
+ METRIC_VAR_DECLARE(name, dsn::percentile_ptr<int64_t>)
+
+// Initialize a metric variable in user class.
+#define METRIC_VAR_INIT(name, entity) _##name(METRIC_##name.instantiate(entity##_metric_entity()))
+#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
+
+// Perform increment-related operations on metrics including gauge and counter.
+#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT(name) _##name->increment()
+
+// Perform set() operations on metrics including gauge and percentile.
+//
+// There are 2 kinds of invocations of set() for a metric:
+// * set(val): set a single value for a metric, such as gauge, percentile;
+// * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric,
+// such as percentile.
+#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
+
+// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
+#define METRIC_VAR_AUTO_LATENCY(name, ...) \
+ dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
+
namespace dsn {
class metric; // IWYU pragma: keep
class metric_entity_prototype; // IWYU pragma: keep
@@ -552,7 +591,7 @@ ENUM_REG_WITH_CUSTOM_NAME(metric_type::kVolatileCounter, volatile_counter)
ENUM_REG_WITH_CUSTOM_NAME(metric_type::kPercentile, percentile)
ENUM_END(metric_type)
-enum class metric_unit
+enum class metric_unit : size_t
{
kNanoSeconds,
kMicroSeconds,
@@ -562,6 +601,31 @@ enum class metric_unit
kInvalidUnit,
};
+#define METRIC_ASSERT_UNIT_LATENCY(unit, index) \
+ static_assert(static_cast<size_t>(metric_unit::unit) == index, \
+ #unit " should be at index " #index)
+
+METRIC_ASSERT_UNIT_LATENCY(kNanoSeconds, 0);
+METRIC_ASSERT_UNIT_LATENCY(kMicroSeconds, 1);
+METRIC_ASSERT_UNIT_LATENCY(kMilliSeconds, 2);
+METRIC_ASSERT_UNIT_LATENCY(kSeconds, 3);
+
+const std::vector<uint64_t> kMetricLatencyConverterFromNS = {
+ 1, 1000, 1000 * 1000, 1000 * 1000 * 1000};
+
+inline uint64_t convert_metric_latency_from_ns(uint64_t latency_ns, metric_unit target_unit)
+{
+ if (dsn_likely(target_unit == metric_unit::kNanoSeconds)) {
+ // Since nanoseconds are used as the latency unit more frequently, eliminate unnecessary
+ // conversion by branch prediction.
+ return latency_ns;
+ }
+
+ auto index = static_cast<size_t>(target_unit);
+ CHECK_LT(index, kMetricLatencyConverterFromNS.size());
+ return latency_ns / kMetricLatencyConverterFromNS[index];
+}
+
ENUM_BEGIN(metric_unit, metric_unit::kInvalidUnit)
ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kNanoSeconds, nanoseconds)
ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kMicroSeconds, microseconds)
@@ -1066,6 +1130,13 @@ public:
_samples.get()[index & (_sample_size - 1)] = val;
}
+ void set(size_t n, const value_type &val)
+ {
+ for (size_t i = 0; i < n; ++i) {
+ set(val);
+ }
+ }
+
// If `type` is not configured, it will return false with zero value stored in `val`;
// otherwise, it will always return true with the value corresponding to `type`.
bool get(kth_percentile_type type, value_type &val) const
@@ -1177,6 +1248,7 @@ private:
friend class metric_entity;
friend class ref_ptr<percentile<value_type, NthElementFinder>>;
+ friend class MetricVarTest;
virtual void close() override
{
@@ -1199,6 +1271,20 @@ private:
release_ref();
}
+ std::vector<value_type> samples_for_test()
+ {
+ size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
+ if (real_sample_size == 0) {
+ return std::vector<value_type>();
+ }
+
+ std::vector<value_type> real_samples(real_sample_size);
+ std::copy(_samples.get(), _samples.get() + real_sample_size, real_samples.begin());
+ return real_samples;
+ }
+
+ void reset_tail_for_test() { _tail.store(0); }
+
value_type value(size_t index) const
{
return _full_nth_elements[index].load(std::memory_order_relaxed);
@@ -1219,7 +1305,7 @@ private:
}
// Find nth elements.
- std::vector<T> array(real_sample_size);
+ std::vector<value_type> array(real_sample_size);
std::copy(_samples.get(), _samples.get() + real_sample_size, array.begin());
_nth_element_finder(array.begin(), array.begin(), array.end());
@@ -1288,4 +1374,47 @@ template <typename T,
using floating_percentile_prototype =
metric_prototype_with<floating_percentile<T, NthElementFinder>>;
+// Compute latency automatically at the end of the scope, which is set to percentile which it has
+// bound to.
+class auto_latency
+{
+public:
+ auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {}
+
+ auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback)
+ : _percentile(percentile), _callback(std::move(callback))
+ {
+ }
+
+ auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns)
+ : _percentile(percentile), _chrono(start_time_ns)
+ {
+ }
+
+ auto_latency(const percentile_ptr<int64_t> &percentile,
+ uint64_t start_time_ns,
+ std::function<void(uint64_t)> callback)
+ : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback))
+ {
+ }
+
+ ~auto_latency()
+ {
+ auto latency =
+ convert_metric_latency_from_ns(_chrono.duration_ns(), _percentile->prototype()->unit());
+ _percentile->set(static_cast<int64_t>(latency));
+
+ if (_callback) {
+ _callback(latency);
+ }
+ }
+
+private:
+ percentile_ptr<int64_t> _percentile;
+ utils::chronograph _chrono;
+ std::function<void(uint64_t)> _callback;
+
+ DISALLOW_COPY_AND_ASSIGN(auto_latency);
+};
+
} // namespace dsn
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index b81a0ac8e..670af36b7 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -83,6 +83,8 @@ METRIC_DEFINE_entity(my_server);
METRIC_DEFINE_entity(my_table);
METRIC_DEFINE_entity(my_replica);
+#define METRIC_VAR_INIT_my_replica(name) METRIC_VAR_INIT(name, my_replica)
+
// Dedicated entity for getting metrics by http service.
METRIC_DEFINE_entity(my_app);
@@ -174,6 +176,26 @@ METRIC_DEFINE_percentile_double(my_server,
dsn::metric_unit::kNanoSeconds,
"a server-level percentile of double type for test");
+METRIC_DEFINE_percentile_int64(my_replica,
+ test_replica_percentile_int64_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "a replica-level percentile of int64 type in nanoseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+ test_replica_percentile_int64_us,
+ dsn::metric_unit::kMicroSeconds,
+ "a replica-level percentile of int64 type in microseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+ test_replica_percentile_int64_ms,
+ dsn::metric_unit::kMilliSeconds,
+ "a replica-level percentile of int64 type in milliseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+ test_replica_percentile_int64_s,
+ dsn::metric_unit::kSeconds,
+ "a replica-level percentile of int64 type in seconds for test");
+
namespace dsn {
TEST(metrics_test, create_entity)
@@ -737,9 +759,7 @@ void run_percentile(const metric_entity_ptr &my_entity,
auto my_metric = prototype.instantiate(my_entity, interval_ms, kth_percentiles, sample_size);
// Preload zero in current thread.
- for (size_t i = 0; i < num_preload; ++i) {
- my_metric->set(0);
- }
+ my_metric->set(num_preload, 0);
// Load other data in each spawned thread evenly.
const size_t num_operations = data.size() / num_threads;
@@ -3056,4 +3076,125 @@ INSTANTIATE_TEST_CASE_P(MetricsTest,
MetricsRetirementTest,
testing::ValuesIn(metrics_retirement_tests));
+class MetricVarTest : public testing::Test
+{
+protected:
+ MetricVarTest();
+
+ void SetUp() override
+ {
+ _test_replica_gauge_int64->set(0);
+ _test_replica_counter->reset();
+ _test_replica_percentile_int64_ns->reset_tail_for_test();
+ _test_replica_percentile_int64_us->reset_tail_for_test();
+ _test_replica_percentile_int64_ms->reset_tail_for_test();
+ _test_replica_percentile_int64_s->reset_tail_for_test();
+ }
+
+ const metric_entity_ptr &my_replica_metric_entity() const { return _my_replica_metric_entity; }
+
+ void test_set_percentile(const std::vector<int64_t> &expected_samples);
+ void test_set_percentile(size_t n, int64_t val);
+
+ const metric_entity_ptr _my_replica_metric_entity;
+ METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64);
+ METRIC_VAR_DECLARE_counter(test_replica_counter);
+ METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ns);
+ METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_us);
+ METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ms);
+ METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_s);
+
+ DISALLOW_COPY_AND_ASSIGN(MetricVarTest);
+};
+
+MetricVarTest::MetricVarTest()
+ : _my_replica_metric_entity(METRIC_ENTITY_my_replica.instantiate("replica_var_test")),
+ METRIC_VAR_INIT_my_replica(test_replica_gauge_int64),
+ METRIC_VAR_INIT_my_replica(test_replica_counter),
+ METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ns),
+ METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_us),
+ METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ms),
+ METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_s)
+{
+}
+
+#define METRIC_VAR_SAMPLES(name) _##name->samples_for_test()
+
+void MetricVarTest::test_set_percentile(const std::vector<int64_t> &expected_samples)
+{
+ for (const auto &val : expected_samples) {
+ METRIC_VAR_SET(test_replica_percentile_int64_ns, val);
+ }
+ EXPECT_EQ(expected_samples, METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
+}
+
+void MetricVarTest::test_set_percentile(size_t n, int64_t val)
+{
+ METRIC_VAR_SET(test_replica_percentile_int64_ns, n, val);
+ EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
+}
+
+#define METRIC_VAR_VALUE(name) _##name->value()
+
+#define TEST_METRIC_VAR_INCREMENT(name) \
+ do { \
+ ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT(name); \
+ ASSERT_EQ(1, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT(name); \
+ ASSERT_EQ(2, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT_BY(name, 5); \
+ ASSERT_EQ(7, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT_BY(name, 18); \
+ ASSERT_EQ(25, METRIC_VAR_VALUE(name)); \
+ } while (0);
+
+TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_gauge_int64); }
+
+TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); }
+
+TEST_F(MetricVarTest, SetGauge)
+{
+ ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+ METRIC_VAR_SET(test_replica_gauge_int64, 5);
+ ASSERT_EQ(5, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+ METRIC_VAR_SET(test_replica_gauge_int64, 18);
+ ASSERT_EQ(18, METRIC_VAR_VALUE(test_replica_gauge_int64));
+}
+
+TEST_F(MetricVarTest, SetPercentileIndividually) { test_set_percentile({20, 50, 10, 25, 16}); }
+
+TEST_F(MetricVarTest, SetPercentileRepeatedly) { test_set_percentile(5, 100); }
+
+#define TEST_METRIC_VAR_AUTO_LATENCY(unit_abbr, factor) \
+ do { \
+ auto start_time_ns = dsn_now_ns(); \
+ uint64_t actual_latency_ns = 0; \
+ { \
+ METRIC_VAR_AUTO_LATENCY(test_replica_percentile_int64_##unit_abbr, \
+ start_time_ns, \
+ [&actual_latency_ns](uint64_t latency) mutable { \
+ actual_latency_ns = latency * factor; \
+ }); \
+ } \
+ \
+ uint64_t expected_latency_ns = dsn_now_ns() - start_time_ns; \
+ ASSERT_GE(expected_latency_ns, actual_latency_ns); \
+ EXPECT_LT(expected_latency_ns - actual_latency_ns, 1000 * 1000); \
+ } while (0)
+
+TEST_F(MetricVarTest, AutoLatencyNanoSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ns, 1); }
+
+TEST_F(MetricVarTest, AutoLatencyMicroSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(us, 1000); }
+
+TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms, 1000 * 1000); }
+
+TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); }
+
} // namespace dsn
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index a47ba594c..dc8d6e1ee 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -32,7 +32,10 @@
#include <cstdio>
#include <string>
-#include "string_view.h"
+#include "runtime/api_layer1.h"
+#include "utils/fmt_logging.h"
+#include "utils/ports.h"
+#include "utils/string_view.h"
namespace dsn {
namespace utils {
@@ -131,5 +134,28 @@ inline int64_t hh_mm_today_to_unix_sec(string_view hhmm_of_day)
return get_unix_sec_today_midnight() + sec_of_day;
}
+class chronograph
+{
+public:
+ chronograph() : chronograph(dsn_now_ns()) {}
+ chronograph(uint64_t start_time_ns) : _start_time_ns(start_time_ns) {}
+ ~chronograph() = default;
+
+ inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
+
+ inline uint64_t duration_ns()
+ {
+ auto now = dsn_now_ns();
+ CHECK_GE(now, _start_time_ns);
+
+ return now - _start_time_ns;
+ }
+
+private:
+ uint64_t _start_time_ns;
+
+ DISALLOW_COPY_AND_ASSIGN(chronograph);
+};
+
} // namespace utils
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org