You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/25 10:55:12 UTC

[incubator-pegasus] 13/28: feat(new_metrics): add table-level metric entity and migrate table-level metrics for server_state of meta (#1431)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit a9a4bbbc030f637e5b844617f070435001a86911
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Apr 11 00:08:03 2023 +0800

    feat(new_metrics): add table-level metric entity and migrate table-level metrics for server_state of meta (#1431)
    
    https://github.com/apache/incubator-pegasus/issues/1331
    
    In perf counters, all metrics of server_state are server-level, for example,
    the number of healthy partitions among all tables of a pegasus cluster.
    
    However, sometimes this is not enough. For example, the metric shows
    that there are 4 unwritable partitions: the 4 unwritable partitions might
    belong to different tables; or, they might belong to one table.
    
    Therefore, these server-level metrics could be changed to table-level.
    This will provide us with the status of each table. On the other hand,
    once server-level metrics is needed, just aggregate on table-level ones.
    
    The metrics of server_state that are migrated and changed to table-level
    include: The number of dead, unreadable, unwritable, writable-ill, and
    healthy partitions among all partitions of a table, the number of times
    the configuration has been changed and the number of times the status
    of partition has been changed to unwritable or writable for a table.
    
    To implement table-level metrics, table-level metric entity is also added.
---
 src/meta/server_state.cpp                   |  87 ++++++-------
 src/meta/server_state.h                     |  13 +-
 src/meta/server_state_restore.cpp           |   2 +
 src/meta/table_metrics.cpp                  | 187 ++++++++++++++++++++++++++++
 src/meta/table_metrics.h                    | 136 ++++++++++++++++++++
 src/meta/test/state_sync_test.cpp           |   2 +
 src/meta/test/update_configuration_test.cpp |   2 +
 src/utils/metrics.h                         |  10 +-
 8 files changed, 376 insertions(+), 63 deletions(-)

diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index ecf66a92b..71fe7b1af 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -62,10 +62,10 @@
 #include "meta/meta_service.h"
 #include "meta/meta_state_service.h"
 #include "meta/partition_guardian.h"
+#include "meta/table_metrics.h"
 #include "meta_admin_types.h"
 #include "meta_bulk_load_service.h"
 #include "metadata_types.h"
-#include "perf_counter/perf_counter.h"
 #include "replica_admin_types.h"
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
@@ -85,12 +85,9 @@
 #include "utils/config_api.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/singleton.h"
 #include "utils/string_conv.h"
 #include "utils/strings.h"
 
-using namespace dsn;
-
 namespace dsn {
 namespace replication {
 DSN_DEFINE_bool(meta_server,
@@ -214,41 +211,6 @@ void server_state::initialize(meta_service *meta_svc, const std::string &apps_ro
     _apps_root = apps_root;
     _add_secondary_enable_flow_control = FLAGS_add_secondary_enable_flow_control;
     _add_secondary_max_count_for_one_node = FLAGS_add_secondary_max_count_for_one_node;
-
-    _dead_partition_count.init_app_counter("eon.server_state",
-                                           "dead_partition_count",
-                                           COUNTER_TYPE_NUMBER,
-                                           "current dead partition count");
-    _unreadable_partition_count.init_app_counter("eon.server_state",
-                                                 "unreadable_partition_count",
-                                                 COUNTER_TYPE_NUMBER,
-                                                 "current unreadable partition count");
-    _unwritable_partition_count.init_app_counter("eon.server_state",
-                                                 "unwritable_partition_count",
-                                                 COUNTER_TYPE_NUMBER,
-                                                 "current unwritable partition count");
-    _writable_ill_partition_count.init_app_counter("eon.server_state",
-                                                   "writable_ill_partition_count",
-                                                   COUNTER_TYPE_NUMBER,
-                                                   "current writable ill partition count");
-    _healthy_partition_count.init_app_counter("eon.server_state",
-                                              "healthy_partition_count",
-                                              COUNTER_TYPE_NUMBER,
-                                              "current healthy partition count");
-    _recent_update_config_count.init_app_counter("eon.server_state",
-                                                 "recent_update_config_count",
-                                                 COUNTER_TYPE_VOLATILE_NUMBER,
-                                                 "update configuration count in the recent period");
-    _recent_partition_change_unwritable_count.init_app_counter(
-        "eon.server_state",
-        "recent_partition_change_unwritable_count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "partition change to unwritable count in the recent period");
-    _recent_partition_change_writable_count.init_app_counter(
-        "eon.server_state",
-        "recent_partition_change_writable_count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "partition change to writable count in the recent period");
 }
 
 bool server_state::spin_wait_staging(int timeout_seconds)
@@ -529,12 +491,14 @@ error_code server_state::initialize_default_apps()
 error_code server_state::sync_apps_to_remote_storage()
 {
     _exist_apps.clear();
+    _table_metric_entities.clear_entities();
     for (auto &kv_pair : _all_apps) {
         if (kv_pair.second->status == app_status::AS_CREATING) {
             CHECK(_exist_apps.find(kv_pair.second->app_name) == _exist_apps.end(),
                   "invalid app name, name = {}",
                   kv_pair.second->app_name);
             _exist_apps.emplace(kv_pair.second->app_name, kv_pair.second);
+            _table_metric_entities.create_entity(kv_pair.first);
         }
     }
 
@@ -584,6 +548,7 @@ error_code server_state::sync_apps_to_remote_storage()
 
     if (err != ERR_OK) {
         _exist_apps.clear();
+        _table_metric_entities.clear_entities();
         return err;
     }
     for (auto &kv : _all_apps) {
@@ -699,6 +664,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
                         if (app->status == app_status::AS_AVAILABLE) {
                             app->status = app_status::AS_CREATING;
                             _exist_apps.emplace(app->app_name, app);
+                            _table_metric_entities.create_entity(app->app_id);
                         } else if (app->status == app_status::AS_DROPPED) {
                             app->status = app_status::AS_DROPPING;
                         } else {
@@ -726,6 +692,7 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
 
     _all_apps.clear();
     _exist_apps.clear();
+    _table_metric_entities.clear_entities();
 
     std::string transaction_state;
     storage
@@ -1197,6 +1164,7 @@ void server_state::create_app(dsn::message_ex *msg)
 
             _all_apps.emplace(app->app_id, app);
             _exist_apps.emplace(request.app_name, app);
+            _table_metric_entities.create_entity(app->app_id);
         }
     }
 
@@ -1214,6 +1182,7 @@ void server_state::do_app_drop(std::shared_ptr<app_state> &app)
         if (ERR_OK == ec) {
             zauto_write_lock l(_lock);
             _exist_apps.erase(app->app_name);
+            _table_metric_entities.remove_entity(app->app_id);
             for (int i = 0; i < app->partition_count; ++i) {
                 drop_partition(app, i);
             }
@@ -1425,6 +1394,7 @@ void server_state::recall_app(dsn::message_ex *msg)
                     target_app->helpers->pending_response = msg;
 
                     _exist_apps.emplace(target_app->app_name, target_app);
+                    _table_metric_entities.create_entity(target_app->app_id);
                 }
             }
         }
@@ -1465,7 +1435,7 @@ void server_state::send_proposal(rpc_address target, const configuration_update_
              proposal.node);
     dsn::message_ex *msg =
         dsn::message_ex::create_request(RPC_CONFIG_PROPOSAL, 0, proposal.config.pid.thread_hash());
-    ::marshall(msg, proposal);
+    dsn::marshall(msg, proposal);
     _meta_svc->send_message(target, msg);
 }
 
@@ -1649,12 +1619,15 @@ void server_state::update_configuration_locally(
         _config_change_subscriber(_all_apps);
     }
 
-    _recent_update_config_count->increment();
+    METRIC_CALL_TABLE_INCREMENT_METHOD(
+        _table_metric_entities, partition_configuration_changes, app.app_id);
     if (old_health_status >= HS_WRITABLE_ILL && new_health_status < HS_WRITABLE_ILL) {
-        _recent_partition_change_unwritable_count->increment();
+        METRIC_CALL_TABLE_INCREMENT_METHOD(
+            _table_metric_entities, unwritable_partition_changes, app.app_id);
     }
     if (old_health_status < HS_WRITABLE_ILL && new_health_status >= HS_WRITABLE_ILL) {
-        _recent_partition_change_writable_count->increment();
+        METRIC_CALL_TABLE_INCREMENT_METHOD(
+            _table_metric_entities, writable_partition_changes, app.app_id);
     }
 }
 
@@ -2446,25 +2419,35 @@ bool server_state::can_run_balancer()
     return true;
 }
 
-void server_state::update_partition_perf_counter()
+void server_state::update_partition_metrics()
 {
-    int counters[HS_MAX_VALUE];
-    ::memset(counters, 0, sizeof(counters));
     auto func = [&](const std::shared_ptr<app_state> &app) {
+        int counters[HS_MAX_VALUE] = {0};
+
         int min_2pc_count =
             _meta_svc->get_options().app_mutation_2pc_min_replica_count(app->max_replica_count);
         for (unsigned int i = 0; i != app->partition_count; ++i) {
             health_status st = partition_health_status(app->partitions[i], min_2pc_count);
             counters[st]++;
         }
+
+        METRIC_CALL_TABLE_SET_METHOD(
+            _table_metric_entities, dead_partitions, app->app_id, counters[HS_DEAD]);
+        METRIC_CALL_TABLE_SET_METHOD(
+            _table_metric_entities, unreadable_partitions, app->app_id, counters[HS_UNREADABLE]);
+        METRIC_CALL_TABLE_SET_METHOD(
+            _table_metric_entities, unwritable_partitions, app->app_id, counters[HS_UNWRITABLE]);
+        METRIC_CALL_TABLE_SET_METHOD(_table_metric_entities,
+                                     writable_ill_partitions,
+                                     app->app_id,
+                                     counters[HS_WRITABLE_ILL]);
+        METRIC_CALL_TABLE_SET_METHOD(
+            _table_metric_entities, healthy_partitions, app->app_id, counters[HS_HEALTHY]);
+
         return true;
     };
+
     for_each_available_app(_all_apps, func);
-    _dead_partition_count->set(counters[HS_DEAD]);
-    _unreadable_partition_count->set(counters[HS_UNREADABLE]);
-    _unwritable_partition_count->set(counters[HS_UNWRITABLE]);
-    _writable_ill_partition_count->set(counters[HS_WRITABLE_ILL]);
-    _healthy_partition_count->set(counters[HS_HEALTHY]);
 }
 
 bool server_state::check_all_partitions()
@@ -2475,7 +2458,7 @@ bool server_state::check_all_partitions()
 
     zauto_write_lock l(_lock);
 
-    update_partition_perf_counter();
+    update_partition_metrics();
 
     // first the cure stage
     if (level <= meta_function_level::fl_freezed) {
diff --git a/src/meta/server_state.h b/src/meta/server_state.h
index a3feb28f9..13a8ef8bd 100644
--- a/src/meta/server_state.h
+++ b/src/meta/server_state.h
@@ -52,9 +52,9 @@
 #include "dsn.layer2_types.h"
 #include "meta/meta_rpc_types.h"
 #include "meta_data.h"
-#include "perf_counter/perf_counter_wrapper.h"
 #include "runtime/task/task.h"
 #include "runtime/task/task_tracker.h"
+#include "table_metrics.h"
 #include "utils/error_code.h"
 #include "utils/zlocks.h"
 
@@ -230,7 +230,7 @@ private:
     bool can_run_balancer();
 
     // user should lock it first
-    void update_partition_perf_counter();
+    void update_partition_metrics();
 
     error_code dump_app_states(const char *local_path,
                                const std::function<app_state *()> &iterator);
@@ -437,14 +437,7 @@ private:
     int32_t _add_secondary_max_count_for_one_node;
     std::vector<std::unique_ptr<command_deregister>> _cmds;
 
-    perf_counter_wrapper _dead_partition_count;
-    perf_counter_wrapper _unreadable_partition_count;
-    perf_counter_wrapper _unwritable_partition_count;
-    perf_counter_wrapper _writable_ill_partition_count;
-    perf_counter_wrapper _healthy_partition_count;
-    perf_counter_wrapper _recent_update_config_count;
-    perf_counter_wrapper _recent_partition_change_unwritable_count;
-    perf_counter_wrapper _recent_partition_change_writable_count;
+    table_metric_entities _table_metric_entities;
 };
 
 } // namespace replication
diff --git a/src/meta/server_state_restore.cpp b/src/meta/server_state_restore.cpp
index 93d790e66..f513b0c4a 100644
--- a/src/meta/server_state_restore.cpp
+++ b/src/meta/server_state_restore.cpp
@@ -37,6 +37,7 @@
 #include "dsn.layer2_types.h"
 #include "meta/meta_data.h"
 #include "meta/meta_rpc_types.h"
+#include "meta/table_metrics.h"
 #include "meta_admin_types.h"
 #include "meta_service.h"
 #include "runtime/rpc/rpc_address.h"
@@ -147,6 +148,7 @@ std::pair<dsn::error_code, std::shared_ptr<app_state>> server_state::restore_app
 
             _all_apps.emplace(app->app_id, app);
             _exist_apps.emplace(info.app_name, app);
+            _table_metric_entities.create_entity(app->app_id);
         }
     }
     // TODO: using one single env to replace
diff --git a/src/meta/table_metrics.cpp b/src/meta/table_metrics.cpp
new file mode 100644
index 000000000..511be89eb
--- /dev/null
+++ b/src/meta/table_metrics.cpp
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "table_metrics.h"
+
+#include <fmt/core.h>
+#include <string>
+
+#include "utils/fmt_logging.h"
+#include "utils/string_view.h"
+
+METRIC_DEFINE_entity(table);
+
+// The number of partitions in each status, see `health_status` and `partition_health_status()`
+// for details.
+
+METRIC_DEFINE_gauge_int64(
+    table,
+    dead_partitions,
+    dsn::metric_unit::kPartitions,
+    "The number of dead partitions, which means primary = 0 && secondary = 0");
+
+METRIC_DEFINE_gauge_int64(table,
+                          unreadable_partitions,
+                          dsn::metric_unit::kPartitions,
+                          "The number of unreadable partitions, which means primary = 0 && "
+                          "secondary > 0");
+
+METRIC_DEFINE_gauge_int64(table,
+                          unwritable_partitions,
+                          dsn::metric_unit::kPartitions,
+                          "The number of unwritable partitions, which means primary = 1 && "
+                          "primary + secondary < mutation_2pc_min_replica_count");
+
+METRIC_DEFINE_gauge_int64(table,
+                          writable_ill_partitions,
+                          dsn::metric_unit::kPartitions,
+                          "The number of writable ill partitions, which means primary = 1 && "
+                          "primary + secondary >= mutation_2pc_min_replica_count && "
+                          "primary + secondary < max_replica_count");
+
+METRIC_DEFINE_gauge_int64(table,
+                          healthy_partitions,
+                          dsn::metric_unit::kPartitions,
+                          "The number of healthy partitions, which means primary = 1 && "
+                          "primary + secondary >= max_replica_count");
+
+METRIC_DEFINE_counter(table,
+                      partition_configuration_changes,
+                      dsn::metric_unit::kChanges,
+                      "The number of times the configuration has been changed");
+
+METRIC_DEFINE_counter(table,
+                      unwritable_partition_changes,
+                      dsn::metric_unit::kChanges,
+                      "The number of times the status of partition has been changed to unwritable");
+
+METRIC_DEFINE_counter(table,
+                      writable_partition_changes,
+                      dsn::metric_unit::kChanges,
+                      "The number of times the status of partition has been changed to writable");
+
+namespace dsn {
+
+namespace {
+
+metric_entity_ptr instantiate_table_metric_entity(int32_t table_id)
+{
+    auto entity_id = fmt::format("table_{}", table_id);
+
+    return METRIC_ENTITY_table.instantiate(entity_id, {{"table_id", std::to_string(table_id)}});
+}
+
+} // anonymous namespace
+
+table_metrics::table_metrics(int32_t table_id)
+    : _table_id(table_id),
+      _table_metric_entity(instantiate_table_metric_entity(table_id)),
+      METRIC_VAR_INIT_table(dead_partitions),
+      METRIC_VAR_INIT_table(unreadable_partitions),
+      METRIC_VAR_INIT_table(unwritable_partitions),
+      METRIC_VAR_INIT_table(writable_ill_partitions),
+      METRIC_VAR_INIT_table(healthy_partitions),
+      METRIC_VAR_INIT_table(partition_configuration_changes),
+      METRIC_VAR_INIT_table(unwritable_partition_changes),
+      METRIC_VAR_INIT_table(writable_partition_changes)
+{
+}
+
+const metric_entity_ptr &table_metrics::table_metric_entity() const
+{
+    CHECK_NOTNULL(_table_metric_entity,
+                  "table metric entity should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate "
+                  "metric");
+    return _table_metric_entity;
+}
+
+bool operator==(const table_metrics &lhs, const table_metrics &rhs)
+{
+    if (&lhs == &rhs) {
+        return true;
+    }
+
+    if (lhs.table_metric_entity().get() != rhs.table_metric_entity().get()) {
+        CHECK_NE(lhs.table_id(), rhs.table_id());
+        return false;
+    }
+
+    CHECK_EQ(lhs.table_id(), rhs.table_id());
+    return true;
+}
+
+bool operator!=(const table_metrics &lhs, const table_metrics &rhs) { return !(lhs == rhs); }
+
+void table_metric_entities::create_entity(int32_t table_id)
+{
+    utils::auto_write_lock l(_lock);
+
+    entity_map::const_iterator iter = _entities.find(table_id);
+    if (dsn_unlikely(iter != _entities.end())) {
+        return;
+    }
+
+    _entities[table_id] = std::make_unique<table_metrics>(table_id);
+}
+
+void table_metric_entities::remove_entity(int32_t table_id)
+{
+    utils::auto_write_lock l(_lock);
+
+    entity_map::const_iterator iter = _entities.find(table_id);
+    if (dsn_unlikely(iter == _entities.end())) {
+        return;
+    }
+
+    _entities.erase(iter);
+}
+
+void table_metric_entities::clear_entities()
+{
+    utils::auto_write_lock l(_lock);
+    _entities.clear();
+}
+
+bool operator==(const table_metric_entities &lhs, const table_metric_entities &rhs)
+{
+    if (&lhs == &rhs) {
+        return true;
+    }
+
+    utils::auto_read_lock l1(lhs._lock);
+    utils::auto_read_lock l2(rhs._lock);
+
+    if (lhs._entities.size() != rhs._entities.size()) {
+        return false;
+    }
+
+    for (const auto &lhs_entity : lhs._entities) {
+        auto rhs_entity = rhs._entities.find(lhs_entity.first);
+        if (rhs_entity == rhs._entities.end()) {
+            return false;
+        }
+
+        if (*(lhs_entity.second) != *(rhs_entity->second)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+} // namespace dsn
diff --git a/src/meta/table_metrics.h b/src/meta/table_metrics.h
new file mode 100644
index 000000000..de5db3888
--- /dev/null
+++ b/src/meta/table_metrics.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "utils/autoref_ptr.h"
+#include "utils/metrics.h"
+#include "utils/ports.h"
+#include "utils/synchronize.h"
+
+namespace dsn {
+class table_metric_entities;
+
+// Maintain a table-level metric entity of meta, and all metrics attached to it.
+class table_metrics
+{
+public:
+    table_metrics(int32_t table_id);
+    ~table_metrics() = default;
+
+    inline int32_t table_id() const { return _table_id; }
+    const metric_entity_ptr &table_metric_entity() const;
+
+    METRIC_DEFINE_SET_METHOD(dead_partitions, int64_t)
+    METRIC_DEFINE_SET_METHOD(unreadable_partitions, int64_t)
+    METRIC_DEFINE_SET_METHOD(unwritable_partitions, int64_t)
+    METRIC_DEFINE_SET_METHOD(writable_ill_partitions, int64_t)
+    METRIC_DEFINE_SET_METHOD(healthy_partitions, int64_t)
+    METRIC_DEFINE_INCREMENT_METHOD(partition_configuration_changes)
+    METRIC_DEFINE_INCREMENT_METHOD(unwritable_partition_changes)
+    METRIC_DEFINE_INCREMENT_METHOD(writable_partition_changes)
+
+private:
+    const int32_t _table_id;
+
+    const metric_entity_ptr _table_metric_entity;
+    METRIC_VAR_DECLARE_gauge_int64(dead_partitions);
+    METRIC_VAR_DECLARE_gauge_int64(unreadable_partitions);
+    METRIC_VAR_DECLARE_gauge_int64(unwritable_partitions);
+    METRIC_VAR_DECLARE_gauge_int64(writable_ill_partitions);
+    METRIC_VAR_DECLARE_gauge_int64(healthy_partitions);
+    METRIC_VAR_DECLARE_counter(partition_configuration_changes);
+    METRIC_VAR_DECLARE_counter(unwritable_partition_changes);
+    METRIC_VAR_DECLARE_counter(writable_partition_changes);
+
+    DISALLOW_COPY_AND_ASSIGN(table_metrics);
+};
+
+bool operator==(const table_metrics &lhs, const table_metrics &rhs);
+bool operator!=(const table_metrics &lhs, const table_metrics &rhs);
+
+#define METRIC_DEFINE_TABLE_SET_METHOD(name, value_type)                                           \
+    void set_##name(int32_t table_id, value_type value)                                            \
+    {                                                                                              \
+        utils::auto_read_lock l(_lock);                                                            \
+                                                                                                   \
+        entity_map::const_iterator iter = _entities.find(table_id);                                \
+        if (dsn_unlikely(iter == _entities.end())) {                                               \
+            return;                                                                                \
+        }                                                                                          \
+        METRIC_CALL_SET_METHOD(*(iter->second), name, value);                                      \
+    }
+
+#define METRIC_CALL_TABLE_SET_METHOD(obj, name, table_id, value) (obj).set_##name(table_id, value)
+
+#define METRIC_DEFINE_TABLE_INCREMENT_METHOD(name)                                                 \
+    void increment_##name(int32_t table_id)                                                        \
+    {                                                                                              \
+        utils::auto_read_lock l(_lock);                                                            \
+                                                                                                   \
+        entity_map::const_iterator iter = _entities.find(table_id);                                \
+        if (dsn_unlikely(iter == _entities.end())) {                                               \
+            return;                                                                                \
+        }                                                                                          \
+        METRIC_CALL_INCREMENT_METHOD(*(iter->second), name);                                       \
+    }
+
+#define METRIC_CALL_TABLE_INCREMENT_METHOD(obj, name, table_id) (obj).increment_##name(table_id)
+
+// Manage the lifetime of all table-level metric entities of meta.
+//
+// To instantiate a new table-level entity, just call create_entity(). Once the entity instance
+// is not needed, just call remove_entity() (after `entity_retirement_delay_ms` milliseconds it
+// would be retired).
+class table_metric_entities
+{
+public:
+    using entity_map = std::unordered_map<int, std::unique_ptr<table_metrics>>;
+
+    table_metric_entities() = default;
+    ~table_metric_entities() = default;
+
+    void create_entity(int32_t table_id);
+    void remove_entity(int32_t table_id);
+    void clear_entities();
+
+    METRIC_DEFINE_TABLE_SET_METHOD(dead_partitions, int64_t)
+    METRIC_DEFINE_TABLE_SET_METHOD(unreadable_partitions, int64_t)
+    METRIC_DEFINE_TABLE_SET_METHOD(unwritable_partitions, int64_t)
+    METRIC_DEFINE_TABLE_SET_METHOD(writable_ill_partitions, int64_t)
+    METRIC_DEFINE_TABLE_SET_METHOD(healthy_partitions, int64_t)
+    METRIC_DEFINE_TABLE_INCREMENT_METHOD(partition_configuration_changes)
+    METRIC_DEFINE_TABLE_INCREMENT_METHOD(unwritable_partition_changes)
+    METRIC_DEFINE_TABLE_INCREMENT_METHOD(writable_partition_changes)
+
+private:
+    friend bool operator==(const table_metric_entities &, const table_metric_entities &);
+
+    mutable utils::rw_lock_nr _lock;
+    entity_map _entities;
+
+    DISALLOW_COPY_AND_ASSIGN(table_metric_entities);
+};
+
+bool operator==(const table_metric_entities &lhs, const table_metric_entities &rhs);
+
+} // namespace dsn
diff --git a/src/meta/test/state_sync_test.cpp b/src/meta/test/state_sync_test.cpp
index 2e1afacf8..657bd8157 100644
--- a/src/meta/test/state_sync_test.cpp
+++ b/src/meta/test/state_sync_test.cpp
@@ -248,6 +248,7 @@ void meta_service_test_app::state_sync_test()
         for (const auto &iter : ss1->_exist_apps) {
             ASSERT_TRUE(ss2->_exist_apps.find(iter.first) != ss2->_exist_apps.end());
         }
+        ASSERT_EQ(ss1->_table_metric_entities, ss2->_table_metric_entities);
 
         // then we dump the content to local file with binary format
         std::cerr << "test dump to local file from zookeeper's storage" << std::endl;
@@ -268,6 +269,7 @@ void meta_service_test_app::state_sync_test()
         for (const auto &iter : ss1->_exist_apps) {
             ASSERT_TRUE(ss2->_exist_apps.find(iter.first) != ss2->_exist_apps.end());
         }
+        ASSERT_EQ(ss1->_table_metric_entities, ss2->_table_metric_entities);
         ss2->initialize_node_state();
 
         // then let's test the query configuration calls
diff --git a/src/meta/test/update_configuration_test.cpp b/src/meta/test/update_configuration_test.cpp
index 4feba0923..cb30bff2b 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -52,6 +52,7 @@
 #include "meta/meta_service.h"
 #include "meta/partition_guardian.h"
 #include "meta/server_state.h"
+#include "meta/table_metrics.h"
 #include "meta/test/misc/misc.h"
 #include "meta_admin_types.h"
 #include "meta_service_test_app.h"
@@ -495,6 +496,7 @@ void meta_service_test_app::cannot_run_balancer_test()
     std::shared_ptr<app_state> the_app = app_state::create(info);
     svc->_state->_all_apps.emplace(info.app_id, the_app);
     svc->_state->_exist_apps.emplace(info.app_name, the_app);
+    svc->_state->_table_metric_entities.create_entity(info.app_id);
 
     dsn::partition_configuration &pc = the_app->partitions[0];
     pc.primary = nodes[0];
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index a230aa1f9..ba85c3fd4 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -166,6 +166,7 @@ class error_code;
 #define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__)
+#define METRIC_VAR_INIT_table(name, ...) METRIC_VAR_INIT(name, table, ##__VA_ARGS__)
 
 // Perform increment-related operations on metrics including gauge and counter.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
@@ -198,7 +199,12 @@ class error_code;
 #define METRIC_DEFINE_SET_METHOD(name, value_type)                                                 \
     void set_##name(value_type value) { METRIC_VAR_SET(name, value); }
 
-#define METRIC_CALL_SET_METHOD(obj, name, value) obj.set_##name(value)
+#define METRIC_CALL_SET_METHOD(obj, name, value) (obj).set_##name(value)
+
+#define METRIC_DEFINE_INCREMENT_METHOD(name)                                                       \
+    void increment_##name() { METRIC_VAR_INCREMENT(name); }
+
+#define METRIC_CALL_INCREMENT_METHOD(obj, name) (obj).increment_##name()
 
 namespace dsn {
 class metric;                  // IWYU pragma: keep
@@ -621,6 +627,7 @@ enum class metric_unit : size_t
     kMegaBytes,
     kCapacityUnits,
     kPercent,
+    kPartitions,
     kRequests,
     kSeeks,
     kPointLookups,
@@ -631,6 +638,7 @@ enum class metric_unit : size_t
     kFlushes,
     kCompactions,
     kWrites,
+    kChanges,
     kInvalidUnit,
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org