You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/06/05 04:34:57 UTC
[incubator-pegasus] 19/32: feat(new_metrics): migrate metrics for replica_stub (part 1) (#1455)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 93083ce68ac76b5b4ed153ec466bf561adca5510
Author: Dan Wang <wa...@apache.org>
AuthorDate: Wed Apr 19 22:57:26 2023 +0800
feat(new_metrics): migrate metrics for replica_stub (part 1) (#1455)
https://github.com/apache/incubator-pegasus/issues/1454
This is the 1st part of migrating metrics of `replica_stub` to new framework.
After migrating to new framework, the 3 metrics, including the total number
of replicas, the number of opening/closing replicas, are still kept server-level.
Another metric, the number of committed requests, is changed to replica-level.
The naming of metric variable would lead to duplication with class member
(such as `_opening_replicas` in `replica_stub` class). Therefore, a macro
`METRIC_VAR_NAME` is introduced to manage the new naming, which is prefixed with
`_metric_` to avoid duplication. Also, generated metric function names are also
managed by related macros.
---
src/meta/table_metrics.h | 10 +++---
src/replica/replica.cpp | 10 ------
src/replica/replica.h | 5 ---
src/replica/replica_stub.cpp | 60 +++++++++++++++++++-----------------
src/replica/replica_stub.h | 8 ++---
src/replica/replication_app_base.cpp | 10 ++++--
src/replica/replication_app_base.h | 4 +++
src/utils/metrics.h | 40 ++++++++++++++----------
src/utils/test/metrics_test.cpp | 14 ++++-----
9 files changed, 84 insertions(+), 77 deletions(-)
diff --git a/src/meta/table_metrics.h b/src/meta/table_metrics.h
index 7dc5e83a3..cec35e64b 100644
--- a/src/meta/table_metrics.h
+++ b/src/meta/table_metrics.h
@@ -93,7 +93,7 @@ public:
METRIC_DEFINE_SET(healthy_partitions, int64_t)
#define __METRIC_DEFINE_INCREMENT_BY(name) \
- void increment_##name##_by(int32_t partition_id, int64_t x) \
+ void METRIC_FUNC_NAME_INCREMENT_BY(name)(int32_t partition_id, int64_t x) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT_BY(*(_partition_metrics[partition_id]), name, x); \
@@ -106,7 +106,7 @@ public:
#undef __METRIC_DEFINE_INCREMENT_BY
#define __METRIC_DEFINE_INCREMENT(name) \
- void increment_##name(int32_t partition_id) \
+ void METRIC_FUNC_NAME_INCREMENT(name)(int32_t partition_id) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_INCREMENT(*(_partition_metrics[partition_id]), name); \
@@ -120,7 +120,7 @@ public:
#undef __METRIC_DEFINE_INCREMENT
#define __METRIC_DEFINE_SET(name, value_type) \
- void set_##name(int32_t partition_id, value_type value) \
+ void METRIC_FUNC_NAME_SET(name)(int32_t partition_id, value_type value) \
{ \
CHECK_LT(partition_id, _partition_metrics.size()); \
METRIC_SET(*(_partition_metrics[partition_id]), name, value); \
@@ -167,7 +167,7 @@ public:
using partition_map = std::unordered_map<gpid, partition_stats>;
#define __METRIC_DEFINE_INCREMENT(name) \
- void increment_##name(const gpid &id, bool balance_checker) \
+ void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id, bool balance_checker) \
{ \
auto &partition = _partition_map[id]; \
++(partition.greedy_recent_balance_operations); \
@@ -210,7 +210,7 @@ public:
void clear_entities();
#define __METRIC_DEFINE_INCREMENT(name) \
- void increment_##name(const gpid &id) \
+ void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id) \
{ \
utils::auto_read_lock l(_lock); \
\
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index f0851c949..121098a1f 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -48,7 +48,6 @@
#include "mutation.h"
#include "mutation_log.h"
#include "perf_counter/perf_counter.h"
-#include "perf_counter/perf_counter_wrapper.h"
#include "perf_counter/perf_counters.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
@@ -229,15 +228,6 @@ void replica::update_last_checkpoint_generate_time()
_last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms / 2, max_interval_ms);
}
-// //
-// Statistics //
-// //
-
-void replica::update_commit_qps(int count)
-{
- _stub->_counter_replicas_commit_qps->add((uint64_t)count);
-}
-
void replica::init_state()
{
_inactive_is_transient = false;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index d986b4e2a..040d1cda4 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -288,11 +288,6 @@ public:
replica_follower *get_replica_follower() const { return _replica_follower.get(); };
- //
- // Statistics
- //
- void update_commit_qps(int count);
-
// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }
const dir_node *get_dir_node() const { return _dir_node; }
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 54b2879e9..601fcd9b1 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -97,6 +97,21 @@
#include "remote_cmd/remote_command.h"
#include "utils/fail_point.h"
+METRIC_DEFINE_gauge_int64(server,
+ total_replicas,
+ dsn::metric_unit::kReplicas,
+ "The total number of replicas");
+
+METRIC_DEFINE_gauge_int64(server,
+ opening_replicas,
+ dsn::metric_unit::kReplicas,
+ "The number of opening replicas");
+
+METRIC_DEFINE_gauge_int64(server,
+ closing_replicas,
+ dsn::metric_unit::kReplicas,
+ "The number of closing replicas");
+
namespace dsn {
namespace replication {
DSN_DEFINE_bool(replication,
@@ -194,7 +209,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_learn_app_concurrent_count(0),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
- _is_running(false)
+ _is_running(false),
+ METRIC_VAR_INIT_server(total_replicas),
+ METRIC_VAR_INIT_server(opening_replicas),
+ METRIC_VAR_INIT_server(closing_replicas)
{
#ifdef DSN_ENABLE_GPERF
_is_releasing_memory = false;
@@ -212,20 +230,6 @@ replica_stub::~replica_stub(void) { close(); }
void replica_stub::install_perf_counters()
{
- _counter_replicas_count.init_app_counter(
- "eon.replica_stub", "replica(Count)", COUNTER_TYPE_NUMBER, "# in replica_stub._replicas");
- _counter_replicas_opening_count.init_app_counter("eon.replica_stub",
- "opening.replica(Count)",
- COUNTER_TYPE_NUMBER,
- "# in replica_stub._opening_replicas");
- _counter_replicas_closing_count.init_app_counter("eon.replica_stub",
- "closing.replica(Count)",
- COUNTER_TYPE_NUMBER,
- "# in replica_stub._closing_replicas");
- _counter_replicas_commit_qps.init_app_counter("eon.replica_stub",
- "replicas.commit.qps",
- COUNTER_TYPE_RATE,
- "server-level commit throughput");
_counter_replicas_learning_count.init_app_counter("eon.replica_stub",
"replicas.learning.count",
COUNTER_TYPE_NUMBER,
@@ -800,7 +804,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
// attach rps
_replicas = std::move(rps);
- _counter_replicas_count->add((uint64_t)_replicas.size());
+ METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size());
for (const auto &kv : _replicas) {
_fs_manager.add_replica(kv.first, kv.second->dir());
}
@@ -2024,10 +2028,10 @@ task_ptr replica_stub::begin_open_replica(
if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) {
// reopen it
_closing_replicas.erase(it);
- _counter_replicas_closing_count->decrement();
+ METRIC_VAR_DECREMENT(closing_replicas);
_replicas.emplace(id, rep);
- _counter_replicas_count->increment();
+ METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(id);
@@ -2053,7 +2057,7 @@ task_ptr replica_stub::begin_open_replica(
std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update));
_opening_replicas[id] = task;
- _counter_replicas_opening_count->increment();
+ METRIC_VAR_INCREMENT(opening_replicas);
_closed_replicas.erase(id);
_replicas_lock.unlock_write();
@@ -2166,7 +2170,7 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
- _counter_replicas_opening_count->decrement();
+ METRIC_VAR_DECREMENT(opening_replicas);
return;
}
@@ -2176,13 +2180,13 @@ void replica_stub::open_replica(
0,
"replica {} is not in _opening_replicas",
id.to_string());
- _counter_replicas_opening_count->decrement();
+ METRIC_VAR_DECREMENT(opening_replicas);
CHECK(_replicas.find(id) == _replicas.end(),
"replica {} is already in _replicas",
id.to_string());
_replicas.insert(replicas::value_type(rep->get_gpid(), rep));
- _counter_replicas_count->increment();
+ METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(id);
}
@@ -2349,7 +2353,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
return nullptr;
}
- _counter_replicas_count->decrement();
+ METRIC_VAR_DECREMENT(total_replicas);
int delay_ms = 0;
if (r->status() == partition_status::PS_INACTIVE) {
@@ -2368,7 +2372,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
0,
std::chrono::milliseconds(delay_ms));
_closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
- _counter_replicas_closing_count->increment();
+ METRIC_VAR_INCREMENT(closing_replicas);
return task;
}
@@ -2388,7 +2392,7 @@ void replica_stub::close_replica(replica_ptr r)
_closed_replicas.emplace(
id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second)));
_closing_replicas.erase(find);
- _counter_replicas_closing_count->decrement();
+ METRIC_VAR_DECREMENT(closing_replicas);
}
if (r->is_data_corrupted()) {
@@ -2852,7 +2856,7 @@ void replica_stub::close()
task->cancel(true);
- _counter_replicas_opening_count->decrement();
+ METRIC_VAR_DECREMENT(opening_replicas);
_replicas_lock.lock_write();
_opening_replicas.erase(_opening_replicas.begin());
}
@@ -2860,7 +2864,7 @@ void replica_stub::close()
while (!_replicas.empty()) {
_replicas.begin()->second->close();
- _counter_replicas_count->decrement();
+ METRIC_VAR_DECREMENT(total_replicas);
_replicas.erase(_replicas.begin());
}
}
@@ -2984,7 +2988,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
if (rep != nullptr) {
auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
- _counter_replicas_count->increment();
+ METRIC_VAR_INCREMENT(total_replicas);
_closed_replicas.erase(child_pid);
}
return rep;
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index c6aa984f9..3e6901cba 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -71,6 +71,7 @@
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
+#include "utils/metrics.h"
#include "utils/zlocks.h"
namespace dsn {
@@ -494,10 +495,9 @@ private:
#endif
// performance counters
- perf_counter_wrapper _counter_replicas_count;
- perf_counter_wrapper _counter_replicas_opening_count;
- perf_counter_wrapper _counter_replicas_closing_count;
- perf_counter_wrapper _counter_replicas_commit_qps;
+ METRIC_VAR_DECLARE_gauge_int64(total_replicas);
+ METRIC_VAR_DECLARE_gauge_int64(opening_replicas);
+ METRIC_VAR_DECLARE_gauge_int64(closing_replicas);
perf_counter_wrapper _counter_replicas_learning_count;
perf_counter_wrapper _counter_replicas_learning_max_duration_time_ms;
diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp
index 5159fd757..33fa450e1 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -66,6 +66,11 @@
#include "utils/threadpool_code.h"
#include "utils/utils.h"
+METRIC_DEFINE_counter(replica,
+ committed_requests,
+ dsn::metric_unit::kRequests,
+ "The number of committed requests");
+
namespace dsn {
class disk_file;
@@ -254,7 +259,8 @@ replication_app_base *replication_app_base::new_storage_instance(const std::stri
return utils::factory_store<replication_app_base>::create(name.c_str(), PROVIDER_TYPE_MAIN, r);
}
-replication_app_base::replication_app_base(replica *replica) : replica_base(replica)
+replication_app_base::replication_app_base(replica *replica)
+ : replica_base(replica), METRIC_VAR_INIT_replica(committed_requests)
{
_dir_data = utils::filesystem::path_combine(replica->dir(), "data");
_dir_learn = utils::filesystem::path_combine(replica->dir(), "learn");
@@ -493,7 +499,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
"mutation {} committed on {}, batched_count = {}", mu->name(), str, batched_count);
}
- _replica->update_commit_qps(batched_count);
+ METRIC_VAR_INCREMENT_BY(committed_requests, batched_count);
return ERR_OK;
}
diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h
index e48ef8a5d..e7ae8eff4 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -39,6 +39,7 @@
#include "replica/replica_base.h"
#include "replica_admin_types.h"
#include "utils/error_code.h"
+#include "utils/metrics.h"
#include "utils/ports.h"
namespace dsn {
@@ -312,6 +313,9 @@ protected:
replica_init_info _info;
explicit replication_app_base(replication::replica *replica);
+
+private:
+ METRIC_VAR_DECLARE_counter(committed_requests);
};
} // namespace replication
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 8b9d396a9..f9ab9c2dc 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -153,7 +153,8 @@ class error_code;
//
// Since a type tends to be a class template where there might be commas, use variadic arguments
// instead of a single fixed argument to represent a type.
-#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ _##name
+#define METRIC_VAR_NAME(name) _metric_##name
+#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ METRIC_VAR_NAME(name)
#define METRIC_VAR_DECLARE_gauge_int64(name) METRIC_VAR_DECLARE(name, dsn::gauge_ptr<int64_t>)
#define METRIC_VAR_DECLARE_counter(name) \
METRIC_VAR_DECLARE(name, dsn::counter_ptr<dsn::striped_long_adder, false>)
@@ -162,7 +163,7 @@ class error_code;
// Initialize a metric variable in user class.
#define METRIC_VAR_INIT(name, entity, ...) \
- _##name(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__))
+ METRIC_VAR_NAME(name)(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__))
#define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__)
#define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__)
#define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__)
@@ -175,15 +176,15 @@ class error_code;
do { \
const auto v = (x); \
if (v != 0) { \
- _##name->increment_by(v); \
+ METRIC_VAR_NAME(name)->increment_by(v); \
} \
} while (0)
// Perform increment() operations on gauges and counters.
-#define METRIC_VAR_INCREMENT(name) _##name->increment()
+#define METRIC_VAR_INCREMENT(name) METRIC_VAR_NAME(name)->increment()
// Perform decrement() operations on gauges.
-#define METRIC_VAR_DECREMENT(name) _##name->decrement()
+#define METRIC_VAR_DECREMENT(name) METRIC_VAR_NAME(name)->decrement()
// Perform set() operations on gauges and percentiles.
//
@@ -191,38 +192,44 @@ class error_code;
// * set(val): set a single value for a metric, such as gauge, percentile;
// * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric,
// such as percentile.
-#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
+#define METRIC_VAR_SET(name, ...) METRIC_VAR_NAME(name)->set(__VA_ARGS__)
// Read the current measurement of gauges and counters.
-#define METRIC_VAR_VALUE(name) _##name->value()
+#define METRIC_VAR_VALUE(name) METRIC_VAR_NAME(name)->value()
// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
#define METRIC_VAR_AUTO_LATENCY(name, ...) \
- dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
+ dsn::auto_latency __##name##_auto_latency(METRIC_VAR_NAME(name), ##__VA_ARGS__)
#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
// Convenient macro that is used to increment/decrement gauge automatically in current scope.
#define METRIC_VAR_AUTO_COUNT(name, ...) \
- dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__)
+ dsn::auto_count __##name##_auto_count(METRIC_VAR_NAME(name), ##__VA_ARGS__)
+
+#define METRIC_FUNC_NAME_INCREMENT_BY(name) increment_##name##_by
#define METRIC_DEFINE_INCREMENT_BY(name) \
- void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
+ void METRIC_FUNC_NAME_INCREMENT_BY(name)(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
// To be adaptive to self-defined `increment_by` methods, arguments are declared as variadic.
-#define METRIC_INCREMENT_BY(obj, name, ...) (obj).increment_##name##_by(__VA_ARGS__)
+#define METRIC_INCREMENT_BY(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT_BY(name)(__VA_ARGS__)
+
+#define METRIC_FUNC_NAME_INCREMENT(name) increment_##name
#define METRIC_DEFINE_INCREMENT(name) \
- void increment_##name() { METRIC_VAR_INCREMENT(name); }
+ void METRIC_FUNC_NAME_INCREMENT(name)() { METRIC_VAR_INCREMENT(name); }
// To be adaptive to self-defined `increment` methods, arguments are declared as variadic.
-#define METRIC_INCREMENT(obj, name, ...) (obj).increment_##name(__VA_ARGS__)
+#define METRIC_INCREMENT(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT(name)(__VA_ARGS__)
+
+#define METRIC_FUNC_NAME_SET(name) set_##name
#define METRIC_DEFINE_SET(name, value_type) \
- void set_##name(value_type value) { METRIC_VAR_SET(name, value); }
+ void METRIC_FUNC_NAME_SET(name)(value_type value) { METRIC_VAR_SET(name, value); }
// To be adaptive to self-defined `set` methods, arguments are declared as variadic.
-#define METRIC_SET(obj, name, ...) (obj).set_##name(__VA_ARGS__)
+#define METRIC_SET(obj, name, ...) (obj).METRIC_FUNC_NAME_SET(name)(__VA_ARGS__)
namespace dsn {
class metric; // IWYU pragma: keep
@@ -646,6 +653,8 @@ enum class metric_unit : size_t
kCapacityUnits,
kPercent,
kPartitions,
+ kReplicas,
+ kServers,
kRequests,
kSeeks,
kPointLookups,
@@ -660,7 +669,6 @@ enum class metric_unit : size_t
kOperations,
kTasks,
kDisconnections,
- kServers,
kInvalidUnit,
};
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 5197ec4b6..24b5d43e3 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3083,12 +3083,12 @@ protected:
void SetUp() override
{
- _test_replica_gauge_int64->set(0);
- _test_replica_counter->reset();
- _test_replica_percentile_int64_ns->reset_tail_for_test();
- _test_replica_percentile_int64_us->reset_tail_for_test();
- _test_replica_percentile_int64_ms->reset_tail_for_test();
- _test_replica_percentile_int64_s->reset_tail_for_test();
+ METRIC_VAR_SET(test_replica_gauge_int64, 0);
+ METRIC_VAR_NAME(test_replica_counter)->reset();
+ METRIC_VAR_NAME(test_replica_percentile_int64_ns)->reset_tail_for_test();
+ METRIC_VAR_NAME(test_replica_percentile_int64_us)->reset_tail_for_test();
+ METRIC_VAR_NAME(test_replica_percentile_int64_ms)->reset_tail_for_test();
+ METRIC_VAR_NAME(test_replica_percentile_int64_s)->reset_tail_for_test();
}
const metric_entity_ptr &my_replica_metric_entity() const { return _my_replica_metric_entity; }
@@ -3120,7 +3120,7 @@ MetricVarTest::MetricVarTest()
{
}
-#define METRIC_VAR_SAMPLES(name) _##name->samples_for_test()
+#define METRIC_VAR_SAMPLES(name) METRIC_VAR_NAME(name)->samples_for_test()
void MetricVarTest::test_set_percentile(const std::vector<int64_t> &expected_samples)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org