You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/17 08:48:12 UTC
[incubator-pegasus] 14/26: feat(new_metrics): add partition-level metric entity and migrate partition-level metrics for greedy_load_balancer of meta (#1435)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit a89eeee33d36e7c557f5b28a78a0efba4c9afe74
Author: Dan Wang <wa...@apache.org>
AuthorDate: Fri Apr 14 13:37:13 2023 +0800
feat(new_metrics): add partition-level metric entity and migrate partition-level metrics for greedy_load_balancer of meta (#1435)
https://github.com/apache/incubator-pegasus/issues/1331
In perf counters, all metrics of greedy_load_balancer are server-level, for
example, the number of each kind of operations by greedy balancer, including
moving primaries, copying primaries and copying secondaries.
For new metrics, it is hoped that they are fine-grained, since sometimes we
want to know which primaries are moved. Also, it is convenient to calculate
table-level or server-level metrics by just aggregate on partition-level ones.
The metrics of greedy_load_balancer that are changed to partition-level and
migrated to new framework include: the number of balance operations by
greedy balancer that are recently needed to be executed, move primaries,
copy primaries, and copy secondaries.
In addition to the metrics of greedy_load_balancer, we also change some
metrics of server_state again to partition-level which have been migrated
to table-level in https://github.com/apache/incubator-pegasus/pull/1431,
for the reason that partition-level is considered more appropriate for them
than table-level. The metrics changed to partition-level include the number
of times the configuration has been changed and the number of times the
status of partition has been changed to unwritable or writable for a partition.
To implement table-level metrics, partition-level metric entity is also added.
---
src/common/fs_manager.cpp | 4 +-
src/common/fs_manager.h | 4 +-
src/meta/greedy_load_balancer.cpp | 48 ++----
src/meta/greedy_load_balancer.h | 7 -
src/meta/meta_split_service.cpp | 5 +
src/meta/server_state.cpp | 38 ++---
src/meta/server_state.h | 2 +
src/meta/server_state_restore.cpp | 2 +-
src/meta/table_metrics.cpp | 241 +++++++++++++++++++++++++---
src/meta/table_metrics.h | 200 ++++++++++++++++++-----
src/meta/test/backup_test.cpp | 1 +
src/meta/test/meta_split_service_test.cpp | 4 +
src/meta/test/update_configuration_test.cpp | 2 +-
src/replica/test/replica_test.cpp | 1 +
src/utils/metrics.h | 20 ++-
15 files changed, 443 insertions(+), 136 deletions(-)
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 61ca99411..0541abca9 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -148,8 +148,8 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
- METRIC_CALL_SET_METHOD(disk_capacity, total_disk_capacity_mb, disk_capacity_mb);
- METRIC_CALL_SET_METHOD(disk_capacity, avail_disk_capacity_mb, disk_available_mb);
+ METRIC_SET(disk_capacity, total_disk_capacity_mb, disk_capacity_mb);
+ METRIC_SET(disk_capacity, avail_disk_capacity_mb, disk_available_mb);
if (!update_disk_status) {
LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 6efe1b1bd..92fa1b048 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -51,8 +51,8 @@ public:
const metric_entity_ptr &disk_metric_entity() const;
- METRIC_DEFINE_SET_METHOD(total_disk_capacity_mb, int64_t)
- METRIC_DEFINE_SET_METHOD(avail_disk_capacity_mb, int64_t)
+ METRIC_DEFINE_SET(total_disk_capacity_mb, int64_t)
+ METRIC_DEFINE_SET(avail_disk_capacity_mb, int64_t)
private:
const metric_entity_ptr _disk_metric_entity;
diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp
index f8206b820..dc8ef4f9d 100644
--- a/src/meta/greedy_load_balancer.cpp
+++ b/src/meta/greedy_load_balancer.cpp
@@ -36,21 +36,23 @@
#include "cluster_balance_policy.h"
#include "greedy_load_balancer.h"
#include "meta/load_balance_policy.h"
+#include "meta/meta_service.h"
#include "meta/server_load_balancer.h"
+#include "meta/server_state.h"
+#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_data.h"
-#include "perf_counter/perf_counter.h"
#include "runtime/rpc/rpc_address.h"
#include "utils/command_manager.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/math.h"
+#include "utils/metrics.h"
namespace dsn {
class gpid;
namespace replication {
-class meta_service;
DSN_DEFINE_bool(meta_server, balance_cluster, false, "whether to enable cluster balancer");
DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE);
@@ -63,27 +65,6 @@ greedy_load_balancer::greedy_load_balancer(meta_service *_svc) : server_load_bal
_cluster_balance_policy = std::make_unique<cluster_balance_policy>(_svc);
::memset(t_operation_counters, 0, sizeof(t_operation_counters));
-
- // init perf counters
- _balance_operation_count.init_app_counter("eon.greedy_balancer",
- "balance_operation_count",
- COUNTER_TYPE_NUMBER,
- "balance operation count to be done");
- _recent_balance_move_primary_count.init_app_counter(
- "eon.greedy_balancer",
- "recent_balance_move_primary_count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "move primary count by balancer in the recent period");
- _recent_balance_copy_primary_count.init_app_counter(
- "eon.greedy_balancer",
- "recent_balance_copy_primary_count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "copy primary count by balancer in the recent period");
- _recent_balance_copy_secondary_count.init_app_counter(
- "eon.greedy_balancer",
- "recent_balance_copy_secondary_count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "copy secondary count by balancer in the recent period");
}
greedy_load_balancer::~greedy_load_balancer() {}
@@ -228,34 +209,37 @@ bool greedy_load_balancer::check(meta_view view, migration_list &list)
void greedy_load_balancer::report(const dsn::replication::migration_list &list,
bool balance_checker)
{
- int counters[MAX_COUNT];
- ::memset(counters, 0, sizeof(counters));
+#define __METRIC_INCREMENT(name) \
+ METRIC_INCREMENT(balance_stats, name, action.first, balance_checker)
+
+ int counters[MAX_COUNT] = {0};
+ greedy_balance_stats balance_stats;
counters[ALL_COUNT] = list.size();
for (const auto &action : list) {
switch (action.second.get()->balance_type) {
case balancer_request_type::move_primary:
counters[MOVE_PRI_COUNT]++;
+ __METRIC_INCREMENT(greedy_move_primary_operations);
break;
case balancer_request_type::copy_primary:
counters[COPY_PRI_COUNT]++;
+ __METRIC_INCREMENT(greedy_copy_primary_operations);
break;
case balancer_request_type::copy_secondary:
counters[COPY_SEC_COUNT]++;
+ __METRIC_INCREMENT(greedy_copy_secondary_operations);
break;
default:
CHECK(false, "");
}
}
+
::memcpy(t_operation_counters, counters, sizeof(counters));
+ METRIC_SET_GREEDY_BALANCE_STATS(_svc->get_server_state()->get_table_metric_entities(),
+ balance_stats);
- // update perf counters
- _balance_operation_count->set(list.size());
- if (!balance_checker) {
- _recent_balance_move_primary_count->add(counters[MOVE_PRI_COUNT]);
- _recent_balance_copy_primary_count->add(counters[COPY_PRI_COUNT]);
- _recent_balance_copy_secondary_count->add(counters[COPY_SEC_COUNT]);
- }
+#undef __METRIC_INCREMENT
}
} // namespace replication
} // namespace dsn
diff --git a/src/meta/greedy_load_balancer.h b/src/meta/greedy_load_balancer.h
index 862a29c19..2284b7a61 100644
--- a/src/meta/greedy_load_balancer.h
+++ b/src/meta/greedy_load_balancer.h
@@ -40,7 +40,6 @@
#include "meta/meta_data.h"
#include "meta_admin_types.h"
-#include "perf_counter/perf_counter_wrapper.h"
#include "server_load_balancer.h"
namespace dsn {
@@ -86,12 +85,6 @@ private:
std::unique_ptr<command_deregister> _get_balance_operation_count;
- // perf counters
- perf_counter_wrapper _balance_operation_count;
- perf_counter_wrapper _recent_balance_move_primary_count;
- perf_counter_wrapper _recent_balance_copy_primary_count;
- perf_counter_wrapper _recent_balance_copy_secondary_count;
-
private:
void greedy_balancer(bool balance_checker);
bool all_replica_infos_collected(const node_state &ns);
diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp
index c95cf7a6e..33610fc6a 100644
--- a/src/meta/meta_split_service.cpp
+++ b/src/meta/meta_split_service.cpp
@@ -35,6 +35,7 @@
#include "meta/meta_service.h"
#include "meta/meta_state_service.h"
#include "meta/server_state.h"
+#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_split_service.h"
#include "meta_state_service_utils.h"
@@ -118,6 +119,7 @@ void meta_split_service::do_start_partition_split(std::shared_ptr<app_state> app
app->partition_count *= 2;
app->helpers->contexts.resize(app->partition_count);
app->partitions.resize(app->partition_count);
+ _state->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
app->envs[replica_envs::SPLIT_VALIDATE_PARTITION_HASH] = "true";
for (int i = 0; i < app->partition_count; ++i) {
@@ -553,10 +555,13 @@ void meta_split_service::do_cancel_partition_split(std::shared_ptr<app_state> ap
LOG_INFO("app({}) update partition count on remote storage, new partition count is {}",
app->app_name,
app->partition_count / 2);
+
zauto_write_lock l(app_lock());
+
app->partition_count /= 2;
app->helpers->contexts.resize(app->partition_count);
app->partitions.resize(app->partition_count);
+ _state->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
};
auto copy = *app;
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 71fe7b1af..3edb04580 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -85,6 +85,7 @@
#include "utils/config_api.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/metrics.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
@@ -498,7 +499,7 @@ error_code server_state::sync_apps_to_remote_storage()
"invalid app name, name = {}",
kv_pair.second->app_name);
_exist_apps.emplace(kv_pair.second->app_name, kv_pair.second);
- _table_metric_entities.create_entity(kv_pair.first);
+ _table_metric_entities.create_entity(kv_pair.first, kv_pair.second->partition_count);
}
}
@@ -664,7 +665,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
if (app->status == app_status::AS_AVAILABLE) {
app->status = app_status::AS_CREATING;
_exist_apps.emplace(app->app_name, app);
- _table_metric_entities.create_entity(app->app_id);
+ _table_metric_entities.create_entity(app->app_id, app->partition_count);
} else if (app->status == app_status::AS_DROPPED) {
app->status = app_status::AS_DROPPING;
} else {
@@ -1164,7 +1165,7 @@ void server_state::create_app(dsn::message_ex *msg)
_all_apps.emplace(app->app_id, app);
_exist_apps.emplace(request.app_name, app);
- _table_metric_entities.create_entity(app->app_id);
+ _table_metric_entities.create_entity(app->app_id, app->partition_count);
}
}
@@ -1394,7 +1395,8 @@ void server_state::recall_app(dsn::message_ex *msg)
target_app->helpers->pending_response = msg;
_exist_apps.emplace(target_app->app_name, target_app);
- _table_metric_entities.create_entity(target_app->app_id);
+ _table_metric_entities.create_entity(target_app->app_id,
+ target_app->partition_count);
}
}
}
@@ -1619,15 +1621,12 @@ void server_state::update_configuration_locally(
_config_change_subscriber(_all_apps);
}
- METRIC_CALL_TABLE_INCREMENT_METHOD(
- _table_metric_entities, partition_configuration_changes, app.app_id);
+ METRIC_INCREMENT(_table_metric_entities, partition_configuration_changes, gpid);
if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
- METRIC_CALL_TABLE_INCREMENT_METHOD(
- _table_metric_entities, unwritable_partition_changes, app.app_id);
+ METRIC_INCREMENT(_table_metric_entities, unwritable_partition_changes, gpid);
}
if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
- METRIC_CALL_TABLE_INCREMENT_METHOD(
- _table_metric_entities, writable_partition_changes, app.app_id);
+ METRIC_INCREMENT(_table_metric_entities, writable_partition_changes, gpid);
}
}
@@ -2431,18 +2430,13 @@ void server_state::update_partition_metrics()
counters[st]++;
}
- METRIC_CALL_TABLE_SET_METHOD(
- _table_metric_entities, dead_partitions, app->app_id, counters[HS_DEAD]);
- METRIC_CALL_TABLE_SET_METHOD(
- _table_metric_entities, unreadable_partitions, app->app_id, counters[HS_UNREADABLE]);
- METRIC_CALL_TABLE_SET_METHOD(
- _table_metric_entities, unwritable_partitions, app->app_id, counters[HS_UNWRITABLE]);
- METRIC_CALL_TABLE_SET_METHOD(_table_metric_entities,
- writable_ill_partitions,
- app->app_id,
- counters[HS_WRITABLE_ILL]);
- METRIC_CALL_TABLE_SET_METHOD(
- _table_metric_entities, healthy_partitions, app->app_id, counters[HS_HEALTHY]);
+ METRIC_SET_TABLE_HEALTH_STATS(_table_metric_entities,
+ app->app_id,
+ counters[HS_DEAD],
+ counters[HS_UNREADABLE],
+ counters[HS_UNWRITABLE],
+ counters[HS_WRITABLE_ILL],
+ counters[HS_HEALTHY]);
return true;
};
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index 13a8ef8bd..730ac8a52 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -222,6 +222,8 @@ public:
task_tracker *tracker() { return &_tracker; }
void wait_all_task() { _tracker.wait_outstanding_tasks(); }
+ table_metric_entities &get_table_metric_entities() { return _table_metric_entities; }
+
private:
FRIEND_TEST(backup_service_test, test_invalid_backup_request);
diff --git a/src/meta/server_state_restore.cpp b/src/meta/server_state_restore.cpp
index f513b0c4a..b2dada4a2 100644
--- a/src/meta/server_state_restore.cpp
+++ b/src/meta/server_state_restore.cpp
@@ -148,7 +148,7 @@ std::pair<dsn::error_code, std::shared_ptr<app_state>> server_state::restore_app
_all_apps.emplace(app->app_id, app);
_exist_apps.emplace(info.app_name, app);
- _table_metric_entities.create_entity(app->app_id);
+ _table_metric_entities.create_entity(app->app_id, app->partition_count);
}
}
// TODO: using one single env to replace
diff --git a/src/meta/table_metrics.cpp b/src/meta/table_metrics.cpp
index 511be89eb..73b992062 100644
--- a/src/meta/table_metrics.cpp
+++ b/src/meta/table_metrics.cpp
@@ -17,12 +17,54 @@
#include "table_metrics.h"
+// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
+#include <fmt/ostream.h>
+#include <stddef.h>
+#include <iosfwd>
#include <string>
#include "utils/fmt_logging.h"
#include "utils/string_view.h"
+METRIC_DEFINE_entity(partition);
+
+METRIC_DEFINE_counter(partition,
+ partition_configuration_changes,
+ dsn::metric_unit::kChanges,
+ "The number of times the configuration has been changed");
+
+METRIC_DEFINE_counter(partition,
+ unwritable_partition_changes,
+ dsn::metric_unit::kChanges,
+ "The number of times the status of partition has been changed to unwritable");
+
+METRIC_DEFINE_counter(partition,
+ writable_partition_changes,
+ dsn::metric_unit::kChanges,
+ "The number of times the status of partition has been changed to writable");
+
+METRIC_DEFINE_gauge_int64(
+ partition,
+ greedy_recent_balance_operations,
+ dsn::metric_unit::kOperations,
+ "The number of balance operations by greedy balancer that are recently needed to be executed");
+
+METRIC_DEFINE_counter(partition,
+ greedy_move_primary_operations,
+ dsn::metric_unit::kOperations,
+ "The number of balance operations by greedy balancer that move primaries");
+
+METRIC_DEFINE_counter(partition,
+ greedy_copy_primary_operations,
+ dsn::metric_unit::kOperations,
+ "The number of balance operations by greedy balancer that copy primaries");
+
+METRIC_DEFINE_counter(partition,
+ greedy_copy_secondary_operations,
+ dsn::metric_unit::kOperations,
+ "The number of balance operations by greedy balancer that copy secondaries");
+
METRIC_DEFINE_entity(table);
// The number of partitions in each status, see `health_status` and `partition_health_status()`
@@ -59,25 +101,19 @@ METRIC_DEFINE_gauge_int64(table,
"The number of healthy partitions, which means primary = 1 && "
"primary + secondary >= max_replica_count");
-METRIC_DEFINE_counter(table,
- partition_configuration_changes,
- dsn::metric_unit::kChanges,
- "The number of times the configuration has been changed");
-
-METRIC_DEFINE_counter(table,
- unwritable_partition_changes,
- dsn::metric_unit::kChanges,
- "The number of times the status of partition has been changed to unwritable");
-
-METRIC_DEFINE_counter(table,
- writable_partition_changes,
- dsn::metric_unit::kChanges,
- "The number of times the status of partition has been changed to writable");
-
namespace dsn {
namespace {
+metric_entity_ptr instantiate_partition_metric_entity(int32_t table_id, int32_t partition_id)
+{
+ auto entity_id = fmt::format("partition_{}", gpid(table_id, partition_id));
+
+ return METRIC_ENTITY_partition.instantiate(
+ entity_id,
+ {{"table_id", std::to_string(table_id)}, {"partition_id", std::to_string(partition_id)}});
+}
+
metric_entity_ptr instantiate_table_metric_entity(int32_t table_id)
{
auto entity_id = fmt::format("table_{}", table_id);
@@ -87,18 +123,63 @@ metric_entity_ptr instantiate_table_metric_entity(int32_t table_id)
} // anonymous namespace
-table_metrics::table_metrics(int32_t table_id)
+partition_metrics::partition_metrics(int32_t table_id, int32_t partition_id)
+ : _table_id(table_id),
+ _partition_id(partition_id),
+ _partition_metric_entity(instantiate_partition_metric_entity(table_id, partition_id)),
+ METRIC_VAR_INIT_partition(partition_configuration_changes),
+ METRIC_VAR_INIT_partition(unwritable_partition_changes),
+ METRIC_VAR_INIT_partition(writable_partition_changes),
+ METRIC_VAR_INIT_partition(greedy_recent_balance_operations),
+ METRIC_VAR_INIT_partition(greedy_move_primary_operations),
+ METRIC_VAR_INIT_partition(greedy_copy_primary_operations),
+ METRIC_VAR_INIT_partition(greedy_copy_secondary_operations)
+{
+}
+
+const metric_entity_ptr &partition_metrics::partition_metric_entity() const
+{
+ CHECK_NOTNULL(_partition_metric_entity,
+ "partition metric entity should has been instantiated: "
+ "uninitialized entity cannot be used to instantiate "
+ "metric");
+ return _partition_metric_entity;
+}
+
+bool operator==(const partition_metrics &lhs, const partition_metrics &rhs)
+{
+ if (&lhs == &rhs) {
+ return true;
+ }
+
+ if (lhs.partition_metric_entity().get() != rhs.partition_metric_entity().get()) {
+ CHECK_TRUE(lhs.table_id() != rhs.table_id() || lhs.partition_id() != rhs.partition_id());
+ return false;
+ }
+
+ CHECK_EQ(lhs.table_id(), rhs.table_id());
+ CHECK_EQ(lhs.partition_id(), rhs.partition_id());
+ return true;
+}
+
+bool operator!=(const partition_metrics &lhs, const partition_metrics &rhs)
+{
+ return !(lhs == rhs);
+}
+
+table_metrics::table_metrics(int32_t table_id, int32_t partition_count)
: _table_id(table_id),
_table_metric_entity(instantiate_table_metric_entity(table_id)),
METRIC_VAR_INIT_table(dead_partitions),
METRIC_VAR_INIT_table(unreadable_partitions),
METRIC_VAR_INIT_table(unwritable_partitions),
METRIC_VAR_INIT_table(writable_ill_partitions),
- METRIC_VAR_INIT_table(healthy_partitions),
- METRIC_VAR_INIT_table(partition_configuration_changes),
- METRIC_VAR_INIT_table(unwritable_partition_changes),
- METRIC_VAR_INIT_table(writable_partition_changes)
+ METRIC_VAR_INIT_table(healthy_partitions)
{
+ _partition_metrics.reserve(partition_count);
+ for (int32_t i = 0; i < partition_count; ++i) {
+ _partition_metrics.push_back(std::make_unique<partition_metrics>(table_id, i));
+ }
}
const metric_entity_ptr &table_metrics::table_metric_entity() const
@@ -110,6 +191,28 @@ const metric_entity_ptr &table_metrics::table_metric_entity() const
return _table_metric_entity;
}
+void table_metrics::resize_partitions(int32_t partition_count)
+{
+ LOG_INFO("resize partitions for table_metrics(table_id={}): old_partition_count={}, "
+ "new_partition_count={}",
+ _table_id,
+ _partition_metrics.size(),
+ partition_count);
+
+ if (_partition_metrics.size() == partition_count) {
+ return;
+ }
+
+ if (_partition_metrics.size() > partition_count) {
+ _partition_metrics.resize(partition_count);
+ return;
+ }
+
+ for (int32_t i = _partition_metrics.size(); i < partition_count; ++i) {
+ _partition_metrics.push_back(std::make_unique<partition_metrics>(_table_id, i));
+ }
+}
+
bool operator==(const table_metrics &lhs, const table_metrics &rhs)
{
if (&lhs == &rhs) {
@@ -122,33 +225,74 @@ bool operator==(const table_metrics &lhs, const table_metrics &rhs)
}
CHECK_EQ(lhs.table_id(), rhs.table_id());
+
+ if (lhs._partition_metrics.size() != rhs._partition_metrics.size()) {
+ return false;
+ }
+
+ for (size_t i = 0; i < lhs._partition_metrics.size(); ++i) {
+ if (*(lhs._partition_metrics[i]) != *(rhs._partition_metrics[i])) {
+ return false;
+ }
+ }
+
return true;
}
bool operator!=(const table_metrics &lhs, const table_metrics &rhs) { return !(lhs == rhs); }
-void table_metric_entities::create_entity(int32_t table_id)
+void table_metric_entities::create_entity(int32_t table_id, int32_t partition_count)
{
+ LOG_INFO("try to create entity for table_metric_entities(table_id={}): partition_count={}",
+ table_id,
+ partition_count);
+
utils::auto_write_lock l(_lock);
entity_map::const_iterator iter = _entities.find(table_id);
if (dsn_unlikely(iter != _entities.end())) {
+ LOG_WARNING("entity has existed for table_metric_entities(table_id={})", table_id);
return;
}
- _entities[table_id] = std::make_unique<table_metrics>(table_id);
+ _entities[table_id] = std::make_unique<table_metrics>(table_id, partition_count);
+ LOG_INFO("entity has been created for table_metric_entities(table_id={}): partition_count={}",
+ table_id,
+ partition_count);
+}
+
+void table_metric_entities::resize_partitions(int32_t table_id, int32_t partition_count)
+{
+ LOG_INFO(
+ "try to resize partitions for table_metric_entities(table_id={}): new_partition_count={}",
+ table_id,
+ partition_count);
+
+ utils::auto_write_lock l(_lock);
+
+ auto iter = _entities.find(table_id);
+ if (dsn_unlikely(iter == _entities.end())) {
+ LOG_WARNING("entity does not exist for table_metric_entities(table_id={})", table_id);
+ return;
+ }
+
+ iter->second->resize_partitions(partition_count);
}
void table_metric_entities::remove_entity(int32_t table_id)
{
+ LOG_INFO("try to remove entity for table_metric_entities(table_id={})", table_id);
+
utils::auto_write_lock l(_lock);
entity_map::const_iterator iter = _entities.find(table_id);
if (dsn_unlikely(iter == _entities.end())) {
+ LOG_WARNING("entity does not exist for table_metric_entities(table_id={})", table_id);
return;
}
_entities.erase(iter);
+ LOG_INFO("entity has been removed for table_metric_entities(table_id={})", table_id);
}
void table_metric_entities::clear_entities()
@@ -157,6 +301,59 @@ void table_metric_entities::clear_entities()
_entities.clear();
}
+void table_metric_entities::set_health_stats(int32_t table_id,
+ int dead_partitions,
+ int unreadable_partitions,
+ int unwritable_partitions,
+ int writable_ill_partitions,
+ int healthy_partitions)
+{
+ utils::auto_read_lock l(_lock);
+
+ auto iter = _entities.find(table_id);
+ if (dsn_unlikely(iter == _entities.end())) {
+ return;
+ }
+
+#define __METRIC_SET(name) METRIC_SET(*(iter->second), name, name)
+
+ __METRIC_SET(dead_partitions);
+ __METRIC_SET(unreadable_partitions);
+ __METRIC_SET(unwritable_partitions);
+ __METRIC_SET(writable_ill_partitions);
+ __METRIC_SET(healthy_partitions);
+
+#undef __METRIC_SET
+}
+
+void table_metric_entities::set_greedy_balance_stats(const greedy_balance_stats &balance_stats)
+{
+ utils::auto_read_lock l(_lock);
+
+ const auto &stats = balance_stats.stats();
+ for (const auto &partition : stats) {
+ auto iter = _entities.find(partition.first.get_app_id());
+ if (dsn_unlikely(iter == _entities.end())) {
+ continue;
+ }
+
+ METRIC_SET(*(iter->second),
+ greedy_recent_balance_operations,
+ partition.first.get_partition_index(),
+ partition.second.greedy_recent_balance_operations);
+
+#define __METRIC_INCREMENT_BY(name) \
+ METRIC_INCREMENT_BY( \
+ *(iter->second), name, partition.first.get_partition_index(), partition.second.name)
+
+ __METRIC_INCREMENT_BY(greedy_move_primary_operations);
+ __METRIC_INCREMENT_BY(greedy_copy_primary_operations);
+ __METRIC_INCREMENT_BY(greedy_copy_secondary_operations);
+
+#undef __METRIC_INCREMENT_BY
+ }
+}
+
bool operator==(const table_metric_entities &lhs, const table_metric_entities &rhs)
{
if (&lhs == &rhs) {
diff --git a/src/meta/table_metrics.h b/src/meta/table_metrics.h
index de5db3888..39fec611d 100644
--- a/src/meta/table_metrics.h
+++ b/src/meta/table_metrics.h
@@ -21,35 +21,114 @@
#include <memory>
#include <unordered_map>
#include <utility>
+#include <vector>
+#include "common/gpid.h"
#include "utils/autoref_ptr.h"
+#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/ports.h"
#include "utils/synchronize.h"
namespace dsn {
-class table_metric_entities;
+
+// Maintain a partition-level metric entity of meta, and all metrics attached to it.
+class partition_metrics
+{
+public:
+ partition_metrics(int32_t table_id, int32_t partition_id);
+ ~partition_metrics() = default;
+
+ inline int32_t table_id() const { return _table_id; }
+ inline int32_t partition_id() const { return _partition_id; }
+ const metric_entity_ptr &partition_metric_entity() const;
+
+ METRIC_DEFINE_INCREMENT(partition_configuration_changes)
+ METRIC_DEFINE_INCREMENT(unwritable_partition_changes)
+ METRIC_DEFINE_INCREMENT(writable_partition_changes)
+
+ METRIC_DEFINE_SET(greedy_recent_balance_operations, int64_t)
+ METRIC_DEFINE_INCREMENT_BY(greedy_move_primary_operations)
+ METRIC_DEFINE_INCREMENT_BY(greedy_copy_primary_operations)
+ METRIC_DEFINE_INCREMENT_BY(greedy_copy_secondary_operations)
+
+private:
+ const int32_t _table_id;
+ const int32_t _partition_id;
+
+ const metric_entity_ptr _partition_metric_entity;
+ METRIC_VAR_DECLARE_counter(partition_configuration_changes);
+ METRIC_VAR_DECLARE_counter(unwritable_partition_changes);
+ METRIC_VAR_DECLARE_counter(writable_partition_changes);
+ METRIC_VAR_DECLARE_gauge_int64(greedy_recent_balance_operations);
+ METRIC_VAR_DECLARE_counter(greedy_move_primary_operations);
+ METRIC_VAR_DECLARE_counter(greedy_copy_primary_operations);
+ METRIC_VAR_DECLARE_counter(greedy_copy_secondary_operations);
+
+ DISALLOW_COPY_AND_ASSIGN(partition_metrics);
+};
+
+bool operator==(const partition_metrics &lhs, const partition_metrics &rhs);
+bool operator!=(const partition_metrics &lhs, const partition_metrics &rhs);
// Maintain a table-level metric entity of meta, and all metrics attached to it.
class table_metrics
{
public:
- table_metrics(int32_t table_id);
+ table_metrics(int32_t table_id, int32_t partition_count);
~table_metrics() = default;
inline int32_t table_id() const { return _table_id; }
const metric_entity_ptr &table_metric_entity() const;
- METRIC_DEFINE_SET_METHOD(dead_partitions, int64_t)
- METRIC_DEFINE_SET_METHOD(unreadable_partitions, int64_t)
- METRIC_DEFINE_SET_METHOD(unwritable_partitions, int64_t)
- METRIC_DEFINE_SET_METHOD(writable_ill_partitions, int64_t)
- METRIC_DEFINE_SET_METHOD(healthy_partitions, int64_t)
- METRIC_DEFINE_INCREMENT_METHOD(partition_configuration_changes)
- METRIC_DEFINE_INCREMENT_METHOD(unwritable_partition_changes)
- METRIC_DEFINE_INCREMENT_METHOD(writable_partition_changes)
+ void resize_partitions(int32_t partition_count);
+
+ METRIC_DEFINE_SET(dead_partitions, int64_t)
+ METRIC_DEFINE_SET(unreadable_partitions, int64_t)
+ METRIC_DEFINE_SET(unwritable_partitions, int64_t)
+ METRIC_DEFINE_SET(writable_ill_partitions, int64_t)
+ METRIC_DEFINE_SET(healthy_partitions, int64_t)
+
+#define __METRIC_DEFINE_INCREMENT_BY(name) \
+ void increment_##name##_by(int32_t partition_id, int64_t x) \
+ { \
+ CHECK_LT(partition_id, _partition_metrics.size()); \
+ METRIC_INCREMENT_BY(*(_partition_metrics[partition_id]), name, x); \
+ }
+
+ __METRIC_DEFINE_INCREMENT_BY(greedy_move_primary_operations)
+ __METRIC_DEFINE_INCREMENT_BY(greedy_copy_primary_operations)
+ __METRIC_DEFINE_INCREMENT_BY(greedy_copy_secondary_operations)
+
+#undef __METRIC_DEFINE_INCREMENT_BY
+
+#define __METRIC_DEFINE_INCREMENT(name) \
+ void increment_##name(int32_t partition_id) \
+ { \
+ CHECK_LT(partition_id, _partition_metrics.size()); \
+ METRIC_INCREMENT(*(_partition_metrics[partition_id]), name); \
+ }
+
+ __METRIC_DEFINE_INCREMENT(partition_configuration_changes)
+ __METRIC_DEFINE_INCREMENT(unwritable_partition_changes)
+ __METRIC_DEFINE_INCREMENT(writable_partition_changes)
+
+#undef __METRIC_DEFINE_INCREMENT
+
+#define __METRIC_DEFINE_SET(name, value_type) \
+ void set_##name(int32_t partition_id, value_type value) \
+ { \
+ CHECK_LT(partition_id, _partition_metrics.size()); \
+ METRIC_SET(*(_partition_metrics[partition_id]), name, value); \
+ }
+
+ __METRIC_DEFINE_SET(greedy_recent_balance_operations, int64_t)
+
+#undef __METRIC_DEFINE_SET
private:
+ friend bool operator==(const table_metrics &, const table_metrics &);
+
const int32_t _table_id;
const metric_entity_ptr _table_metric_entity;
@@ -58,9 +137,8 @@ private:
METRIC_VAR_DECLARE_gauge_int64(unwritable_partitions);
METRIC_VAR_DECLARE_gauge_int64(writable_ill_partitions);
METRIC_VAR_DECLARE_gauge_int64(healthy_partitions);
- METRIC_VAR_DECLARE_counter(partition_configuration_changes);
- METRIC_VAR_DECLARE_counter(unwritable_partition_changes);
- METRIC_VAR_DECLARE_counter(writable_partition_changes);
+
+ std::vector<std::unique_ptr<partition_metrics>> _partition_metrics;
DISALLOW_COPY_AND_ASSIGN(table_metrics);
};
@@ -68,33 +146,46 @@ private:
bool operator==(const table_metrics &lhs, const table_metrics &rhs);
bool operator!=(const table_metrics &lhs, const table_metrics &rhs);
-#define METRIC_DEFINE_TABLE_SET_METHOD(name, value_type) \
- void set_##name(int32_t table_id, value_type value) \
- { \
- utils::auto_read_lock l(_lock); \
- \
- entity_map::const_iterator iter = _entities.find(table_id); \
- if (dsn_unlikely(iter == _entities.end())) { \
- return; \
- } \
- METRIC_CALL_SET_METHOD(*(iter->second), name, value); \
- }
+class greedy_balance_stats
+{
+public:
+ greedy_balance_stats() = default;
+ ~greedy_balance_stats() = default;
+
+ struct partition_stats
+ {
+ int greedy_recent_balance_operations = 0;
+ int greedy_move_primary_operations = 0;
+ int greedy_copy_primary_operations = 0;
+ int greedy_copy_secondary_operations = 0;
+ };
-#define METRIC_CALL_TABLE_SET_METHOD(obj, name, table_id, value) (obj).set_##name(table_id, value)
+ using partition_map = std::unordered_map<gpid, partition_stats>;
-#define METRIC_DEFINE_TABLE_INCREMENT_METHOD(name) \
- void increment_##name(int32_t table_id) \
+#define __METRIC_DEFINE_INCREMENT(name) \
+ void increment_##name(const gpid &id, bool balance_checker) \
{ \
- utils::auto_read_lock l(_lock); \
- \
- entity_map::const_iterator iter = _entities.find(table_id); \
- if (dsn_unlikely(iter == _entities.end())) { \
+ auto &partition = _partition_map[id]; \
+ ++(partition.greedy_recent_balance_operations); \
+ if (balance_checker) { \
return; \
} \
- METRIC_CALL_INCREMENT_METHOD(*(iter->second), name); \
+ ++(partition.name); \
}
-#define METRIC_CALL_TABLE_INCREMENT_METHOD(obj, name, table_id) (obj).increment_##name(table_id)
+ __METRIC_DEFINE_INCREMENT(greedy_move_primary_operations)
+ __METRIC_DEFINE_INCREMENT(greedy_copy_primary_operations)
+ __METRIC_DEFINE_INCREMENT(greedy_copy_secondary_operations)
+
+#undef __METRIC_DEFINE_INCREMENT
+
+ const partition_map &stats() const { return _partition_map; }
+
+private:
+ partition_map _partition_map;
+
+ DISALLOW_COPY_AND_ASSIGN(greedy_balance_stats);
+};
// Manage the lifetime of all table-level metric entities of meta.
//
@@ -109,18 +200,38 @@ public:
table_metric_entities() = default;
~table_metric_entities() = default;
- void create_entity(int32_t table_id);
+ void create_entity(int32_t table_id, int32_t partition_count);
+ void resize_partitions(int32_t table_id, int32_t partition_count);
void remove_entity(int32_t table_id);
void clear_entities();
- METRIC_DEFINE_TABLE_SET_METHOD(dead_partitions, int64_t)
- METRIC_DEFINE_TABLE_SET_METHOD(unreadable_partitions, int64_t)
- METRIC_DEFINE_TABLE_SET_METHOD(unwritable_partitions, int64_t)
- METRIC_DEFINE_TABLE_SET_METHOD(writable_ill_partitions, int64_t)
- METRIC_DEFINE_TABLE_SET_METHOD(healthy_partitions, int64_t)
- METRIC_DEFINE_TABLE_INCREMENT_METHOD(partition_configuration_changes)
- METRIC_DEFINE_TABLE_INCREMENT_METHOD(unwritable_partition_changes)
- METRIC_DEFINE_TABLE_INCREMENT_METHOD(writable_partition_changes)
+#define __METRIC_DEFINE_INCREMENT(name) \
+ void increment_##name(const gpid &id) \
+ { \
+ utils::auto_read_lock l(_lock); \
+ \
+ auto iter = _entities.find(id.get_app_id()); \
+ if (dsn_unlikely(iter == _entities.end())) { \
+ return; \
+ } \
+ \
+ METRIC_INCREMENT(*(iter->second), name, id.get_partition_index()); \
+ }
+
+ __METRIC_DEFINE_INCREMENT(partition_configuration_changes)
+ __METRIC_DEFINE_INCREMENT(unwritable_partition_changes)
+ __METRIC_DEFINE_INCREMENT(writable_partition_changes)
+
+#undef __METRIC_DEFINE_INCREMENT
+
+ void set_greedy_balance_stats(const greedy_balance_stats &balance_stats);
+
+ void set_health_stats(int32_t table_id,
+ int dead_partitions,
+ int unreadable_partitions,
+ int unwritable_partitions,
+ int writable_ill_partitions,
+ int healthy_partitions);
private:
friend bool operator==(const table_metric_entities &, const table_metric_entities &);
@@ -133,4 +244,9 @@ private:
bool operator==(const table_metric_entities &lhs, const table_metric_entities &rhs);
+#define METRIC_SET_GREEDY_BALANCE_STATS(obj, ...) (obj).set_greedy_balance_stats(__VA_ARGS__)
+
+#define METRIC_SET_TABLE_HEALTH_STATS(obj, table_id, ...) \
+ (obj).set_health_stats(table_id, ##__VA_ARGS__)
+
} // namespace dsn
diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp
index 70062ba69..35b88bb97 100644
--- a/src/meta/test/backup_test.cpp
+++ b/src/meta/test/backup_test.cpp
@@ -29,6 +29,7 @@
#include <set>
#include <string>
#include <thread>
+#include <type_traits>
#include <utility>
#include <vector>
diff --git a/src/meta/test/meta_split_service_test.cpp b/src/meta/test/meta_split_service_test.cpp
index ea611c633..762987d78 100644
--- a/src/meta/test/meta_split_service_test.cpp
+++ b/src/meta/test/meta_split_service_test.cpp
@@ -55,6 +55,7 @@
#include "meta/meta_split_service.h"
#include "meta/meta_state_service_utils.h"
#include "meta/server_state.h"
+#include "meta/table_metrics.h"
#include "meta_admin_types.h"
#include "meta_service_test_app.h"
#include "meta_test_base.h"
@@ -208,6 +209,7 @@ public:
{
app->partition_count = NEW_PARTITION_COUNT;
app->partitions.resize(app->partition_count);
+ _ss->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
app->helpers->contexts.resize(app->partition_count);
app->helpers->split_states.splitting_count = app->partition_count / 2;
for (int i = 0; i < app->partition_count; ++i) {
@@ -227,6 +229,7 @@ public:
{
app->partition_count = PARTITION_COUNT;
app->partitions.resize(app->partition_count);
+ _ss->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
app->helpers->contexts.resize(app->partition_count);
app->helpers->split_states.splitting_count = 0;
app->helpers->split_states.status.clear();
@@ -236,6 +239,7 @@ public:
{
app->partition_count = NEW_PARTITION_COUNT;
app->partitions.resize(app->partition_count);
+ _ss->get_table_metric_entities().resize_partitions(app->app_id, app->partition_count);
app->helpers->contexts.resize(app->partition_count);
for (int i = 0; i < app->partition_count; ++i) {
app->helpers->contexts[i].config_owner = &app->partitions[i];
diff --git a/src/meta/test/update_configuration_test.cpp b/src/meta/test/update_configuration_test.cpp
index cb30bff2b..4e8f34f64 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -496,7 +496,7 @@ void meta_service_test_app::cannot_run_balancer_test()
std::shared_ptr<app_state> the_app = app_state::create(info);
svc->_state->_all_apps.emplace(info.app_id, the_app);
svc->_state->_exist_apps.emplace(info.app_name, the_app);
- svc->_state->_table_metric_entities.create_entity(info.app_id);
+ svc->_state->_table_metric_entities.create_entity(info.app_id, info.partition_count);
dsn::partition_configuration &pc = the_app->partitions[0];
pc.primary = nodes[0];
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index fbb872715..695417c86 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -18,6 +18,7 @@
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <stddef.h>
#include <stdint.h>
#include <iostream>
#include <map>
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index ba85c3fd4..cb6b7cb48 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -167,6 +167,7 @@ class error_code;
#define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__)
#define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__)
#define METRIC_VAR_INIT_table(name, ...) METRIC_VAR_INIT(name, table, ##__VA_ARGS__)
+#define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
// Perform increment-related operations on metrics including gauge and counter.
#define METRIC_VAR_INCREMENT_BY(name, x) \
@@ -196,15 +197,23 @@ class error_code;
#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
-#define METRIC_DEFINE_SET_METHOD(name, value_type) \
- void set_##name(value_type value) { METRIC_VAR_SET(name, value); }
+#define METRIC_DEFINE_INCREMENT_BY(name) \
+ void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
-#define METRIC_CALL_SET_METHOD(obj, name, value) (obj).set_##name(value)
+// To be adaptive to self-defined `increment_by` methods, arguments are declared as variadic.
+#define METRIC_INCREMENT_BY(obj, name, ...) (obj).increment_##name##_by(__VA_ARGS__)
-#define METRIC_DEFINE_INCREMENT_METHOD(name) \
+#define METRIC_DEFINE_INCREMENT(name) \
void increment_##name() { METRIC_VAR_INCREMENT(name); }
-#define METRIC_CALL_INCREMENT_METHOD(obj, name) (obj).increment_##name()
+// To be adaptive to self-defined `increment` methods, arguments are declared as variadic.
+#define METRIC_INCREMENT(obj, name, ...) (obj).increment_##name(__VA_ARGS__)
+
+#define METRIC_DEFINE_SET(name, value_type) \
+ void set_##name(value_type value) { METRIC_VAR_SET(name, value); }
+
+// To be adaptive to self-defined `set` methods, arguments are declared as variadic.
+#define METRIC_SET(obj, name, ...) (obj).set_##name(__VA_ARGS__)
namespace dsn {
class metric; // IWYU pragma: keep
@@ -639,6 +648,7 @@ enum class metric_unit : size_t
kCompactions,
kWrites,
kChanges,
+ kOperations,
kInvalidUnit,
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org