You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/05 07:28:02 UTC

[incubator-pegasus] 07/23: feat(new_metrics): migrate replica-level metrics for pegasus_event_listener (#1407)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit da8bf31ed2cf668b97dae227b2dc220c253dc975
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu Mar 23 18:06:01 2023 +0800

    feat(new_metrics): migrate replica-level metrics for pegasus_event_listener (#1407)
    
    https://github.com/apache/incubator-pegasus/issues/1343
    
    Migrate replica-level metrics in pegasus_event_listener class to new framework, all of which
    are rocksdb-related, including the number of completed flushes/compactions, the size of
    flush output in bytes, the size of compaction input/output in bytes.
    
    Note that in old perf counters there are just 2 replica-level metrics for pegasus_event_listener
    while all of others are server-level. Migrated to new framework all of the metrics have become
    replica-level; once server-level metrics are needed, just aggregate on replica-level ones.
---
 src/server/pegasus_event_listener.cpp | 123 ++++++++++++++++------------------
 src/server/pegasus_event_listener.h   |  26 ++++---
 src/utils/metrics.h                   |   3 +
 3 files changed, 72 insertions(+), 80 deletions(-)

diff --git a/src/server/pegasus_event_listener.cpp b/src/server/pegasus_event_listener.cpp
index e5414d8b7..330f93af9 100644
--- a/src/server/pegasus_event_listener.cpp
+++ b/src/server/pegasus_event_listener.cpp
@@ -19,100 +19,91 @@
 
 #include "pegasus_event_listener.h"
 
-#include <fmt/core.h>
-#include <fmt/ostream.h>
 #include <rocksdb/compaction_job_stats.h>
 #include <rocksdb/table_properties.h>
-#include <iosfwd>
-#include <string>
 
-#include "common/gpid.h"
-#include "perf_counter/perf_counter.h"
+#include "utils/autoref_ptr.h"
 #include "utils/fmt_logging.h"
+#include "utils/string_view.h"
 
 namespace rocksdb {
 class DB;
 } // namespace rocksdb
 
+METRIC_DEFINE_counter(replica,
+                      rdb_flush_completed_count,
+                      dsn::metric_unit::kFlushes,
+                      "The number of completed rocksdb flushes");
+
+METRIC_DEFINE_counter(replica,
+                      rdb_flush_output_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The size of rocksdb flush output in bytes");
+
+METRIC_DEFINE_counter(replica,
+                      rdb_compaction_completed_count,
+                      dsn::metric_unit::kCompactions,
+                      "The number of completed rocksdb compactions");
+
+METRIC_DEFINE_counter(replica,
+                      rdb_compaction_input_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The size of rocksdb compaction input in bytes");
+
+METRIC_DEFINE_counter(replica,
+                      rdb_compaction_output_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The size of rocksdb compaction output in bytes");
+
+METRIC_DEFINE_counter(
+    replica,
+    rdb_changed_delayed_writes,
+    dsn::metric_unit::kWrites,
+    "The number of rocksdb delayed writes changed from another write stall condition");
+
+METRIC_DEFINE_counter(
+    replica,
+    rdb_changed_stopped_writes,
+    dsn::metric_unit::kWrites,
+    "The number of rocksdb stopped writes changed from another write stall condition");
+
 namespace pegasus {
 namespace server {
 
-pegasus_event_listener::pegasus_event_listener(replica_base *r) : replica_base(r)
+pegasus_event_listener::pegasus_event_listener(replica_base *r)
+    : replica_base(r),
+      METRIC_VAR_INIT_replica(rdb_flush_completed_count),
+      METRIC_VAR_INIT_replica(rdb_flush_output_bytes),
+      METRIC_VAR_INIT_replica(rdb_compaction_completed_count),
+      METRIC_VAR_INIT_replica(rdb_compaction_input_bytes),
+      METRIC_VAR_INIT_replica(rdb_compaction_output_bytes),
+      METRIC_VAR_INIT_replica(rdb_changed_delayed_writes),
+      METRIC_VAR_INIT_replica(rdb_changed_stopped_writes)
 {
-    _pfc_recent_flush_completed_count.init_app_counter("app.pegasus",
-                                                       "recent.flush.completed.count",
-                                                       COUNTER_TYPE_VOLATILE_NUMBER,
-                                                       "rocksdb recent flush completed count");
-    _pfc_recent_flush_output_bytes.init_app_counter("app.pegasus",
-                                                    "recent.flush.output.bytes",
-                                                    COUNTER_TYPE_VOLATILE_NUMBER,
-                                                    "rocksdb recent flush output bytes");
-    _pfc_recent_compaction_completed_count.init_app_counter(
-        "app.pegasus",
-        "recent.compaction.completed.count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "rocksdb recent compaction completed count");
-    _pfc_recent_compaction_input_bytes.init_app_counter("app.pegasus",
-                                                        "recent.compaction.input.bytes",
-                                                        COUNTER_TYPE_VOLATILE_NUMBER,
-                                                        "rocksdb recent compaction input bytes");
-    _pfc_recent_compaction_output_bytes.init_app_counter("app.pegasus",
-                                                         "recent.compaction.output.bytes",
-                                                         COUNTER_TYPE_VOLATILE_NUMBER,
-                                                         "rocksdb recent compaction output bytes");
-    _pfc_recent_write_change_delayed_count.init_app_counter(
-        "app.pegasus",
-        "recent.write.change.delayed.count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "rocksdb recent write change delayed count");
-    _pfc_recent_write_change_stopped_count.init_app_counter(
-        "app.pegasus",
-        "recent.write.change.stopped.count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "rocksdb recent write change stopped count");
-
-    // replica-level perfcounter
-    std::string counter_str = fmt::format("recent_rdb_compaction_input_bytes@{}", r->get_gpid());
-    _pfc_recent_rdb_compaction_input_bytes.init_app_counter(
-        "app.pegasus",
-        counter_str.c_str(),
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "rocksdb recent compaction input bytes");
-
-    counter_str = fmt::format("recent_rdb_compaction_output_bytes@{}", r->get_gpid());
-    _pfc_recent_rdb_compaction_output_bytes.init_app_counter(
-        "app.pegasus",
-        counter_str.c_str(),
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "rocksdb recent compaction output bytes");
 }
 
-void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db,
-                                              const rocksdb::FlushJobInfo &flush_job_info)
+void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info)
 {
-    _pfc_recent_flush_completed_count->increment();
-    _pfc_recent_flush_output_bytes->add(flush_job_info.table_properties.data_size);
+    METRIC_VAR_INCREMENT(rdb_flush_completed_count);
+    METRIC_VAR_INCREMENT_BY(rdb_flush_output_bytes, info.table_properties.data_size);
 }
 
 void pegasus_event_listener::OnCompactionCompleted(rocksdb::DB *db,
-                                                   const rocksdb::CompactionJobInfo &ci)
+                                                   const rocksdb::CompactionJobInfo &info)
 {
-    _pfc_recent_compaction_completed_count->increment();
-    _pfc_recent_compaction_input_bytes->add(ci.stats.total_input_bytes);
-    _pfc_recent_compaction_output_bytes->add(ci.stats.total_output_bytes);
-
-    _pfc_recent_rdb_compaction_input_bytes->add(ci.stats.total_input_bytes);
-    _pfc_recent_rdb_compaction_output_bytes->add(ci.stats.total_output_bytes);
+    METRIC_VAR_INCREMENT(rdb_compaction_completed_count);
+    METRIC_VAR_INCREMENT_BY(rdb_compaction_input_bytes, info.stats.total_input_bytes);
+    METRIC_VAR_INCREMENT_BY(rdb_compaction_output_bytes, info.stats.total_output_bytes);
 }
 
 void pegasus_event_listener::OnStallConditionsChanged(const rocksdb::WriteStallInfo &info)
 {
     if (info.condition.cur == rocksdb::WriteStallCondition::kDelayed) {
         LOG_ERROR_PREFIX("rocksdb write delayed");
-        _pfc_recent_write_change_delayed_count->increment();
+        METRIC_VAR_INCREMENT(rdb_changed_delayed_writes);
     } else if (info.condition.cur == rocksdb::WriteStallCondition::kStopped) {
         LOG_ERROR_PREFIX("rocksdb write stopped");
-        _pfc_recent_write_change_stopped_count->increment();
+        METRIC_VAR_INCREMENT(rdb_changed_stopped_writes);
     }
 }
 
diff --git a/src/server/pegasus_event_listener.h b/src/server/pegasus_event_listener.h
index 5e6ab7e1a..900fabdc8 100644
--- a/src/server/pegasus_event_listener.h
+++ b/src/server/pegasus_event_listener.h
@@ -21,8 +21,8 @@
 
 #include <rocksdb/listener.h>
 
-#include "perf_counter/perf_counter_wrapper.h"
 #include "replica/replica_base.h"
+#include "utils/metrics.h"
 
 namespace rocksdb {
 class DB;
@@ -37,24 +37,22 @@ public:
     explicit pegasus_event_listener(replica_base *r);
     ~pegasus_event_listener() override = default;
 
-    void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override;
+    void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info) override;
 
-    void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &ci) override;
+    void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &info) override;
 
     void OnStallConditionsChanged(const rocksdb::WriteStallInfo &info) override;
 
 private:
-    ::dsn::perf_counter_wrapper _pfc_recent_flush_completed_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_flush_output_bytes;
-    ::dsn::perf_counter_wrapper _pfc_recent_compaction_completed_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_compaction_input_bytes;
-    ::dsn::perf_counter_wrapper _pfc_recent_compaction_output_bytes;
-    ::dsn::perf_counter_wrapper _pfc_recent_write_change_delayed_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_write_change_stopped_count;
-
-    // replica-level perfcounter
-    ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_input_bytes;
-    ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_output_bytes;
+    METRIC_VAR_DECLARE_counter(rdb_flush_completed_count);
+    METRIC_VAR_DECLARE_counter(rdb_flush_output_bytes);
+
+    METRIC_VAR_DECLARE_counter(rdb_compaction_completed_count);
+    METRIC_VAR_DECLARE_counter(rdb_compaction_input_bytes);
+    METRIC_VAR_DECLARE_counter(rdb_compaction_output_bytes);
+
+    METRIC_VAR_DECLARE_counter(rdb_changed_delayed_writes);
+    METRIC_VAR_DECLARE_counter(rdb_changed_stopped_writes);
 };
 
 } // namespace server
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 333534703..9087b78e5 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -620,6 +620,9 @@ enum class metric_unit : size_t
     kKeys,
     kFiles,
     kAmplification,
+    kFlushes,
+    kCompactions,
+    kWrites,
     kInvalidUnit,
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org