You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/05 07:28:07 UTC

[incubator-pegasus] 12/23: feat(new_metrics): add disk-level metric entity and migrate disk-level metrics for fs_manager (#1427)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 8a8dbc24f6d5bd797bf669258ae6e2ec6e895088
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu Apr 6 11:41:34 2023 +0800

    feat(new_metrics): add disk-level metric entity and migrate disk-level metrics for fs_manager (#1427)
    
    https://github.com/apache/incubator-pegasus/issues/1425
    
    In perf counters, all metrics of `fs_manager` are server-level. For example,
    the total capacity and the available capacity of all disks where there are
    data of pegasus.
    
    However, sometimes the capacity and the available capacity of each disk
    seem more important: no space left on the disk will lead to serious problems.
    Therefore, after being migrated to new framework, the server-level metrics
    of perf counters become disk-level, including the capacity and the available
    capacity of a disk. As for another disk-level metric -- the available percentage
    of each disk used by a replica server, just use division operator.
    
    Once server-level metrics are needed, just aggregate on the disk-level ones.
    To compute another 2 server-level metrics -- the minimal/maximal available
    percentage among all disks used by a replica server in a node, for example,
    just use min/max operators over disk-level ones for Prometheus.
    
    To implement disk-level metrics, disk-level metric entity are also added.
---
 src/common/fs_manager.cpp      | 102 +++++++++++++++++++++++++----------------
 src/common/fs_manager.h        |  58 +++++++++++++----------
 src/common/test/CMakeLists.txt |   1 +
 src/meta/test/misc/misc.cpp    |   2 +-
 src/replica/replica_stub.cpp   |   1 -
 src/utils/metrics.h            |   7 +++
 6 files changed, 105 insertions(+), 66 deletions(-)

diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 5a460a896..61ca99411 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -41,7 +41,7 @@
 
 #include "common/gpid.h"
 #include "common/replication_enums.h"
-#include "perf_counter/perf_counter.h"
+#include "fmt/core.h"
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
 #include "utils/fail_point.h"
@@ -49,6 +49,18 @@
 #include "utils/fmt_logging.h"
 #include "utils/string_view.h"
 
+METRIC_DEFINE_entity(disk);
+
+METRIC_DEFINE_gauge_int64(disk,
+                          total_disk_capacity_mb,
+                          dsn::metric_unit::kMegaBytes,
+                          "The total disk capacity");
+
+METRIC_DEFINE_gauge_int64(disk,
+                          avail_disk_capacity_mb,
+                          dsn::metric_unit::kMegaBytes,
+                          "The available disk capacity");
+
 namespace dsn {
 namespace replication {
 
@@ -61,6 +73,34 @@ DSN_DEFINE_int32(replication,
                  "space insufficient");
 DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
 
+namespace {
+
+metric_entity_ptr instantiate_disk_metric_entity(const std::string &tag,
+                                                 const std::string &data_dir)
+{
+    auto entity_id = fmt::format("disk_{}", tag);
+
+    return METRIC_ENTITY_disk.instantiate(entity_id, {{"tag", tag}, {"data_dir", data_dir}});
+}
+
+} // anonymous namespace
+
+disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std::string &data_dir)
+    : _disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)),
+      METRIC_VAR_INIT_disk(total_disk_capacity_mb),
+      METRIC_VAR_INIT_disk(avail_disk_capacity_mb)
+{
+}
+
+const metric_entity_ptr &disk_capacity_metrics::disk_metric_entity() const
+{
+    CHECK_NOTNULL(_disk_metric_entity,
+                  "disk metric entity should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate "
+                  "metric");
+    return _disk_metric_entity;
+}
+
 unsigned dir_node::replicas_count() const
 {
     unsigned sum = 0;
@@ -108,6 +148,9 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
     disk_available_ratio = static_cast<int>(
         disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
 
+    METRIC_CALL_SET_METHOD(disk_capacity, total_disk_capacity_mb, disk_capacity_mb);
+    METRIC_CALL_SET_METHOD(disk_capacity, avail_disk_capacity_mb, disk_available_mb);
+
     if (!update_disk_status) {
         LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
                  "available_ratio = {}%",
@@ -134,32 +177,6 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
     return (old_status != new_status);
 }
 
-fs_manager::fs_manager(bool for_test)
-{
-    if (!for_test) {
-        _counter_total_capacity_mb.init_app_counter("eon.replica_stub",
-                                                    "disk.capacity.total(MB)",
-                                                    COUNTER_TYPE_NUMBER,
-                                                    "total disk capacity in MB");
-        _counter_total_available_mb.init_app_counter("eon.replica_stub",
-                                                     "disk.available.total(MB)",
-                                                     COUNTER_TYPE_NUMBER,
-                                                     "total disk available in MB");
-        _counter_total_available_ratio.init_app_counter("eon.replica_stub",
-                                                        "disk.available.total.ratio",
-                                                        COUNTER_TYPE_NUMBER,
-                                                        "total disk available ratio");
-        _counter_min_available_ratio.init_app_counter("eon.replica_stub",
-                                                      "disk.available.min.ratio",
-                                                      COUNTER_TYPE_NUMBER,
-                                                      "minimal disk available ratio in all disks");
-        _counter_max_available_ratio.init_app_counter("eon.replica_stub",
-                                                      "disk.available.max.ratio",
-                                                      COUNTER_TYPE_NUMBER,
-                                                      "maximal disk available ratio in all disks");
-    }
-}
-
 dir_node *fs_manager::get_dir_node(const std::string &subdir) const
 {
     zauto_read_lock l(_lock);
@@ -298,17 +315,29 @@ bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &
 
 void fs_manager::update_disk_stat(bool check_status_changed)
 {
-    reset_disk_stat();
+    _total_capacity_mb = 0;
+    _total_available_mb = 0;
+    int total_available_ratio = 0;
+    int min_available_ratio = 100;
+    int max_available_ratio = 0;
+
+    // _status_updated_dir_nodes is accessed sequentially in update_disk_stat() and
+    // replica_stub::update_disks_status() during replica_stub::on_disk_stat(), thus
+    // no need to protect it by lock.
+    _status_updated_dir_nodes.clear();
+
+    zauto_read_lock l(_lock);
+
     for (auto &dir_node : _dir_nodes) {
         if (dir_node->update_disk_stat(check_status_changed)) {
             _status_updated_dir_nodes.emplace_back(dir_node);
         }
         _total_capacity_mb += dir_node->disk_capacity_mb;
         _total_available_mb += dir_node->disk_available_mb;
-        _min_available_ratio = std::min(dir_node->disk_available_ratio, _min_available_ratio);
-        _max_available_ratio = std::max(dir_node->disk_available_ratio, _max_available_ratio);
+        min_available_ratio = std::min(dir_node->disk_available_ratio, min_available_ratio);
+        max_available_ratio = std::max(dir_node->disk_available_ratio, max_available_ratio);
     }
-    _total_available_ratio = static_cast<int>(
+    total_available_ratio = static_cast<int>(
         _total_capacity_mb == 0 ? 0 : std::round(_total_available_mb * 100.0 / _total_capacity_mb));
 
     LOG_INFO("update disk space succeed: disk_count = {}, total_capacity_mb = {}, "
@@ -317,14 +346,9 @@ void fs_manager::update_disk_stat(bool check_status_changed)
              _dir_nodes.size(),
              _total_capacity_mb,
              _total_available_mb,
-             _total_available_ratio,
-             _min_available_ratio,
-             _max_available_ratio);
-    _counter_total_capacity_mb->set(_total_capacity_mb);
-    _counter_total_available_mb->set(_total_available_mb);
-    _counter_total_available_ratio->set(_total_available_ratio);
-    _counter_min_available_ratio->set(_min_available_ratio);
-    _counter_max_available_ratio->set(_max_available_ratio);
+             total_available_ratio,
+             min_available_ratio,
+             max_available_ratio);
 }
 
 void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 75427cc89..6efe1b1bd 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -28,9 +28,11 @@
 
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
-#include "perf_counter/perf_counter_wrapper.h"
+#include "utils/autoref_ptr.h"
 #include "utils/error_code.h"
 #include "utils/flags.h"
+#include "utils/metrics.h"
+#include "utils/ports.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
@@ -41,6 +43,25 @@ class replication_options;
 
 DSN_DECLARE_int32(disk_min_available_space_ratio);
 
+class disk_capacity_metrics
+{
+public:
+    disk_capacity_metrics(const std::string &tag, const std::string &data_dir);
+    ~disk_capacity_metrics() = default;
+
+    const metric_entity_ptr &disk_metric_entity() const;
+
+    METRIC_DEFINE_SET_METHOD(total_disk_capacity_mb, int64_t)
+    METRIC_DEFINE_SET_METHOD(avail_disk_capacity_mb, int64_t)
+
+private:
+    const metric_entity_ptr _disk_metric_entity;
+    METRIC_VAR_DECLARE_gauge_int64(total_disk_capacity_mb);
+    METRIC_VAR_DECLARE_gauge_int64(avail_disk_capacity_mb);
+
+    DISALLOW_COPY_AND_ASSIGN(disk_capacity_metrics);
+};
+
 struct dir_node
 {
 public:
@@ -54,6 +75,9 @@ public:
     std::map<app_id, std::set<gpid>> holding_primary_replicas;
     std::map<app_id, std::set<gpid>> holding_secondary_replicas;
 
+private:
+    disk_capacity_metrics disk_capacity;
+
 public:
     dir_node(const std::string &tag_,
              const std::string &dir_,
@@ -66,7 +90,8 @@ public:
           disk_capacity_mb(disk_capacity_mb_),
           disk_available_mb(disk_available_mb_),
           disk_available_ratio(disk_available_ratio_),
-          status(status_)
+          status(status_),
+          disk_capacity(tag_, dir_)
     {
     }
     unsigned replicas_count(app_id id) const;
@@ -79,8 +104,8 @@ public:
 class fs_manager
 {
 public:
-    fs_manager(bool for_test);
-    ~fs_manager() {}
+    fs_manager() = default;
+    ~fs_manager() = default;
 
     // this should be called before open/load any replicas
     dsn::error_code initialize(const replication_options &opts);
@@ -106,27 +131,16 @@ public:
     }
 
 private:
-    void reset_disk_stat()
-    {
-        _total_capacity_mb = 0;
-        _total_available_mb = 0;
-        _total_available_ratio = 0;
-        _min_available_ratio = 100;
-        _max_available_ratio = 0;
-        _status_updated_dir_nodes.clear();
-    }
-
     dir_node *get_dir_node(const std::string &subdir) const;
 
-    // when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock.
-    // but when visit the holding_replicas, you must take care.
+    // TODO(wangdan): _dir_nodes should be protected by lock since add_new_disk are supported:
+    // it might be updated arbitrarily at any time.
+    //
+    // Especially when visiting the holding_replicas, you must take care.
     mutable zrwlock_nr _lock;
 
     int64_t _total_capacity_mb = 0;
     int64_t _total_available_mb = 0;
-    int _total_available_ratio = 0;
-    int _min_available_ratio = 100;
-    int _max_available_ratio = 0;
 
     std::vector<std::shared_ptr<dir_node>> _dir_nodes;
     std::vector<std::string> _available_data_dirs;
@@ -136,12 +150,6 @@ private:
     // in this round
     std::vector<std::shared_ptr<dir_node>> _status_updated_dir_nodes;
 
-    perf_counter_wrapper _counter_total_capacity_mb;
-    perf_counter_wrapper _counter_total_available_mb;
-    perf_counter_wrapper _counter_total_available_ratio;
-    perf_counter_wrapper _counter_min_available_ratio;
-    perf_counter_wrapper _counter_max_available_ratio;
-
     friend class replica_test;
     friend class replica_stub;
     friend class mock_replica_stub;
diff --git a/src/common/test/CMakeLists.txt b/src/common/test/CMakeLists.txt
index 78d94000c..74a9cdf3e 100644
--- a/src/common/test/CMakeLists.txt
+++ b/src/common/test/CMakeLists.txt
@@ -27,6 +27,7 @@ set(MY_PROJ_NAME dsn_replication_common_test)
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
+        dsn_http
         dsn_replication_common
         dsn_runtime
         gtest
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index 5733dd651..36a20806d 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -203,7 +203,7 @@ void generate_node_fs_manager(const app_mapper &apps,
     for (const auto &kv : nodes) {
         const node_state &ns = kv.second;
         if (nfm.find(ns.addr()) == nfm.end()) {
-            nfm.emplace(ns.addr(), std::make_shared<fs_manager>(true));
+            nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
         }
         fs_manager &manager = *(nfm.find(ns.addr())->second);
         manager.initialize(data_dirs, tags, true);
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index d5cf1f0bc..41b7e46a8 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -198,7 +198,6 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
       _mem_release_max_reserved_mem_percentage(10),
       _max_concurrent_bulk_load_downloading_count(5),
       _learn_app_concurrent_count(0),
-      _fs_manager(false),
       _bulk_load_downloading_count(0),
       _manual_emergency_checkpointing_count(0),
       _is_running(false)
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index e69268006..a230aa1f9 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -165,6 +165,7 @@ class error_code;
     _##name(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__))
 #define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__)
+#define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__)
 
 // Perform increment-related operations on metrics including gauge and counter.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
@@ -194,6 +195,11 @@ class error_code;
 
 #define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
 
+#define METRIC_DEFINE_SET_METHOD(name, value_type)                                                 \
+    void set_##name(value_type value) { METRIC_VAR_SET(name, value); }
+
+#define METRIC_CALL_SET_METHOD(obj, name, value) obj.set_##name(value)
+
 namespace dsn {
 class metric;                  // IWYU pragma: keep
 class metric_entity_prototype; // IWYU pragma: keep
@@ -614,6 +620,7 @@ enum class metric_unit : size_t
     kBytes,
     kMegaBytes,
     kCapacityUnits,
+    kPercent,
     kRequests,
     kSeeks,
     kPointLookups,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org