You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/06/05 04:34:45 UTC
[incubator-pegasus] 07/32: feat(new_metrics): migrate replica-level metrics for pegasus_event_listener (#1407)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 01a7f70ea5a091b45b75c9959af9f02ca477b914
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu Mar 23 18:06:01 2023 +0800
feat(new_metrics): migrate replica-level metrics for pegasus_event_listener (#1407)
https://github.com/apache/incubator-pegasus/issues/1343
Migrate replica-level metrics in pegasus_event_listener class to new framework, all of which
are rocksdb-related, including the number of completed flushes/compactions, the size of
flush output in bytes, the size of compaction input/output in bytes.
Note that in old perf counters there are just 2 replica-level metrics for pegasus_event_listener
while all of others are server-level. Migrated to new framework all of the metrics have become
replica-level; once server-level metrics are needed, just aggregate on replica-level ones.
---
src/server/pegasus_event_listener.cpp | 123 ++++++++++++++++------------------
src/server/pegasus_event_listener.h | 26 ++++---
src/utils/metrics.h | 3 +
3 files changed, 72 insertions(+), 80 deletions(-)
diff --git a/src/server/pegasus_event_listener.cpp b/src/server/pegasus_event_listener.cpp
index e5414d8b7..330f93af9 100644
--- a/src/server/pegasus_event_listener.cpp
+++ b/src/server/pegasus_event_listener.cpp
@@ -19,100 +19,91 @@
#include "pegasus_event_listener.h"
-#include <fmt/core.h>
-#include <fmt/ostream.h>
#include <rocksdb/compaction_job_stats.h>
#include <rocksdb/table_properties.h>
-#include <iosfwd>
-#include <string>
-#include "common/gpid.h"
-#include "perf_counter/perf_counter.h"
+#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
+#include "utils/string_view.h"
namespace rocksdb {
class DB;
} // namespace rocksdb
+METRIC_DEFINE_counter(replica,
+ rdb_flush_completed_count,
+ dsn::metric_unit::kFlushes,
+ "The number of completed rocksdb flushes");
+
+METRIC_DEFINE_counter(replica,
+ rdb_flush_output_bytes,
+ dsn::metric_unit::kBytes,
+ "The size of rocksdb flush output in bytes");
+
+METRIC_DEFINE_counter(replica,
+ rdb_compaction_completed_count,
+ dsn::metric_unit::kCompactions,
+ "The number of completed rocksdb compactions");
+
+METRIC_DEFINE_counter(replica,
+ rdb_compaction_input_bytes,
+ dsn::metric_unit::kBytes,
+ "The size of rocksdb compaction input in bytes");
+
+METRIC_DEFINE_counter(replica,
+ rdb_compaction_output_bytes,
+ dsn::metric_unit::kBytes,
+ "The size of rocksdb compaction output in bytes");
+
+METRIC_DEFINE_counter(
+ replica,
+ rdb_changed_delayed_writes,
+ dsn::metric_unit::kWrites,
+ "The number of rocksdb delayed writes changed from another write stall condition");
+
+METRIC_DEFINE_counter(
+ replica,
+ rdb_changed_stopped_writes,
+ dsn::metric_unit::kWrites,
+ "The number of rocksdb stopped writes changed from another write stall condition");
+
namespace pegasus {
namespace server {
-pegasus_event_listener::pegasus_event_listener(replica_base *r) : replica_base(r)
+pegasus_event_listener::pegasus_event_listener(replica_base *r)
+ : replica_base(r),
+ METRIC_VAR_INIT_replica(rdb_flush_completed_count),
+ METRIC_VAR_INIT_replica(rdb_flush_output_bytes),
+ METRIC_VAR_INIT_replica(rdb_compaction_completed_count),
+ METRIC_VAR_INIT_replica(rdb_compaction_input_bytes),
+ METRIC_VAR_INIT_replica(rdb_compaction_output_bytes),
+ METRIC_VAR_INIT_replica(rdb_changed_delayed_writes),
+ METRIC_VAR_INIT_replica(rdb_changed_stopped_writes)
{
- _pfc_recent_flush_completed_count.init_app_counter("app.pegasus",
- "recent.flush.completed.count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent flush completed count");
- _pfc_recent_flush_output_bytes.init_app_counter("app.pegasus",
- "recent.flush.output.bytes",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent flush output bytes");
- _pfc_recent_compaction_completed_count.init_app_counter(
- "app.pegasus",
- "recent.compaction.completed.count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent compaction completed count");
- _pfc_recent_compaction_input_bytes.init_app_counter("app.pegasus",
- "recent.compaction.input.bytes",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent compaction input bytes");
- _pfc_recent_compaction_output_bytes.init_app_counter("app.pegasus",
- "recent.compaction.output.bytes",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent compaction output bytes");
- _pfc_recent_write_change_delayed_count.init_app_counter(
- "app.pegasus",
- "recent.write.change.delayed.count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent write change delayed count");
- _pfc_recent_write_change_stopped_count.init_app_counter(
- "app.pegasus",
- "recent.write.change.stopped.count",
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent write change stopped count");
-
- // replica-level perfcounter
- std::string counter_str = fmt::format("recent_rdb_compaction_input_bytes@{}", r->get_gpid());
- _pfc_recent_rdb_compaction_input_bytes.init_app_counter(
- "app.pegasus",
- counter_str.c_str(),
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent compaction input bytes");
-
- counter_str = fmt::format("recent_rdb_compaction_output_bytes@{}", r->get_gpid());
- _pfc_recent_rdb_compaction_output_bytes.init_app_counter(
- "app.pegasus",
- counter_str.c_str(),
- COUNTER_TYPE_VOLATILE_NUMBER,
- "rocksdb recent compaction output bytes");
}
-void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db,
- const rocksdb::FlushJobInfo &flush_job_info)
+void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info)
{
- _pfc_recent_flush_completed_count->increment();
- _pfc_recent_flush_output_bytes->add(flush_job_info.table_properties.data_size);
+ METRIC_VAR_INCREMENT(rdb_flush_completed_count);
+ METRIC_VAR_INCREMENT_BY(rdb_flush_output_bytes, info.table_properties.data_size);
}
void pegasus_event_listener::OnCompactionCompleted(rocksdb::DB *db,
- const rocksdb::CompactionJobInfo &ci)
+ const rocksdb::CompactionJobInfo &info)
{
- _pfc_recent_compaction_completed_count->increment();
- _pfc_recent_compaction_input_bytes->add(ci.stats.total_input_bytes);
- _pfc_recent_compaction_output_bytes->add(ci.stats.total_output_bytes);
-
- _pfc_recent_rdb_compaction_input_bytes->add(ci.stats.total_input_bytes);
- _pfc_recent_rdb_compaction_output_bytes->add(ci.stats.total_output_bytes);
+ METRIC_VAR_INCREMENT(rdb_compaction_completed_count);
+ METRIC_VAR_INCREMENT_BY(rdb_compaction_input_bytes, info.stats.total_input_bytes);
+ METRIC_VAR_INCREMENT_BY(rdb_compaction_output_bytes, info.stats.total_output_bytes);
}
void pegasus_event_listener::OnStallConditionsChanged(const rocksdb::WriteStallInfo &info)
{
if (info.condition.cur == rocksdb::WriteStallCondition::kDelayed) {
LOG_ERROR_PREFIX("rocksdb write delayed");
- _pfc_recent_write_change_delayed_count->increment();
+ METRIC_VAR_INCREMENT(rdb_changed_delayed_writes);
} else if (info.condition.cur == rocksdb::WriteStallCondition::kStopped) {
LOG_ERROR_PREFIX("rocksdb write stopped");
- _pfc_recent_write_change_stopped_count->increment();
+ METRIC_VAR_INCREMENT(rdb_changed_stopped_writes);
}
}
diff --git a/src/server/pegasus_event_listener.h b/src/server/pegasus_event_listener.h
index 5e6ab7e1a..900fabdc8 100644
--- a/src/server/pegasus_event_listener.h
+++ b/src/server/pegasus_event_listener.h
@@ -21,8 +21,8 @@
#include <rocksdb/listener.h>
-#include "perf_counter/perf_counter_wrapper.h"
#include "replica/replica_base.h"
+#include "utils/metrics.h"
namespace rocksdb {
class DB;
@@ -37,24 +37,22 @@ public:
explicit pegasus_event_listener(replica_base *r);
~pegasus_event_listener() override = default;
- void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override;
+ void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info) override;
- void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &ci) override;
+ void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &info) override;
void OnStallConditionsChanged(const rocksdb::WriteStallInfo &info) override;
private:
- ::dsn::perf_counter_wrapper _pfc_recent_flush_completed_count;
- ::dsn::perf_counter_wrapper _pfc_recent_flush_output_bytes;
- ::dsn::perf_counter_wrapper _pfc_recent_compaction_completed_count;
- ::dsn::perf_counter_wrapper _pfc_recent_compaction_input_bytes;
- ::dsn::perf_counter_wrapper _pfc_recent_compaction_output_bytes;
- ::dsn::perf_counter_wrapper _pfc_recent_write_change_delayed_count;
- ::dsn::perf_counter_wrapper _pfc_recent_write_change_stopped_count;
-
- // replica-level perfcounter
- ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_input_bytes;
- ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_output_bytes;
+ METRIC_VAR_DECLARE_counter(rdb_flush_completed_count);
+ METRIC_VAR_DECLARE_counter(rdb_flush_output_bytes);
+
+ METRIC_VAR_DECLARE_counter(rdb_compaction_completed_count);
+ METRIC_VAR_DECLARE_counter(rdb_compaction_input_bytes);
+ METRIC_VAR_DECLARE_counter(rdb_compaction_output_bytes);
+
+ METRIC_VAR_DECLARE_counter(rdb_changed_delayed_writes);
+ METRIC_VAR_DECLARE_counter(rdb_changed_stopped_writes);
};
} // namespace server
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 333534703..9087b78e5 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -620,6 +620,9 @@ enum class metric_unit : size_t
kKeys,
kFiles,
kAmplification,
+ kFlushes,
+ kCompactions,
+ kWrites,
kInvalidUnit,
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org