You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/12 03:56:06 UTC

[incubator-pegasus] branch migrate-metrics-dev updated: feat(new_metrics): migrate metrics for replica_stub (part 6) (#1474)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/migrate-metrics-dev by this push:
     new 74f37909a feat(new_metrics): migrate metrics for replica_stub (part 6) (#1474)
74f37909a is described below

commit 74f37909a11f77888c32ea455418bf90a9a20de1
Author: Dan Wang <wa...@apache.org>
AuthorDate: Fri May 12 11:55:59 2023 +0800

    feat(new_metrics): migrate metrics for replica_stub (part 6) (#1474)
    
    https://github.com/apache/incubator-pegasus/issues/1454
    
    This is the 6th part of migrating metrics of replica_stub to new framework,
    all of which are bulk-load-related.
    
    During this migration, there are 7 metrics which are changed from server-level
    to replica-level, including the number of downloading/ingesting bulk loads, the
    number of successful/failed bulk loads, the number of files that have been
    downloaded successfully or have failed to be downloaded for bulk loads, the
    size of files that have been downloaded successfully for bulk loads.
    
    Another 3 metrics are still kept server-level, including the number of current
    running bulk loads, the max duration of ingestions or the whole bulk loads.
---
 src/replica/bulk_load/replica_bulk_loader.cpp | 62 +++++++++++++++++++----
 src/replica/bulk_load/replica_bulk_loader.h   |  9 ++++
 src/replica/replica_stub.cpp                  | 72 ++++++++-------------------
 src/replica/replica_stub.h                    | 14 ++----
 src/utils/metrics.h                           |  1 +
 5 files changed, 87 insertions(+), 71 deletions(-)

diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp
index 60f3e5a04..ac4ab60bf 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -30,8 +30,6 @@
 #include "common/replication_common.h"
 #include "common/replication_enums.h"
 #include "dsn.layer2_types.h"
-#include "perf_counter/perf_counter.h"
-#include "perf_counter/perf_counter_wrapper.h"
 #include "replica/disk_cleaner.h"
 #include "replica/mutation.h"
 #include "replica/replica_context.h"
@@ -50,6 +48,41 @@
 #include "utils/string_view.h"
 #include "utils/thread_access_checker.h"
 
+METRIC_DEFINE_counter(replica,
+                      bulk_load_downloading_count,
+                      dsn::metric_unit::kBulkLoads,
+                      "The number of downloading bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_ingesting_count,
+                      dsn::metric_unit::kBulkLoads,
+                      "The number of ingesting bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_successful_count,
+                      dsn::metric_unit::kBulkLoads,
+                      "The number of successful bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_failed_count,
+                      dsn::metric_unit::kBulkLoads,
+                      "The number of failed bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_download_file_successful_count,
+                      dsn::metric_unit::kFiles,
+                      "The number of files that have been downloaded successfully for bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_download_file_failed_count,
+                      dsn::metric_unit::kFiles,
+                      "The number of files that have failed to be downloaded for bulk loads");
+
+METRIC_DEFINE_counter(replica,
+                      bulk_load_download_file_bytes,
+                      dsn::metric_unit::kBytes,
+                      "The size of files that have been downloaded successfully for bulk loads");
+
 namespace dsn {
 namespace dist {
 namespace block_service {
@@ -60,7 +93,16 @@ class block_filesystem;
 namespace replication {
 
 replica_bulk_loader::replica_bulk_loader(replica *r)
-    : replica_base(r), _replica(r), _stub(r->get_replica_stub())
+    : replica_base(r),
+      _replica(r),
+      _stub(r->get_replica_stub()),
+      METRIC_VAR_INIT_replica(bulk_load_downloading_count),
+      METRIC_VAR_INIT_replica(bulk_load_ingesting_count),
+      METRIC_VAR_INIT_replica(bulk_load_successful_count),
+      METRIC_VAR_INIT_replica(bulk_load_failed_count),
+      METRIC_VAR_INIT_replica(bulk_load_download_file_successful_count),
+      METRIC_VAR_INIT_replica(bulk_load_download_file_failed_count),
+      METRIC_VAR_INIT_replica(bulk_load_download_file_bytes)
 {
 }
 
@@ -320,7 +362,7 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
         break;
     case bulk_load_status::BLS_FAILED:
         handle_bulk_load_finish(bulk_load_status::BLS_FAILED);
-        _stub->_counter_bulk_load_failed_count->increment();
+        METRIC_VAR_INCREMENT(bulk_load_failed_count);
         break;
     default:
         break;
@@ -408,7 +450,7 @@ error_code replica_bulk_loader::start_download(const std::string &remote_dir,
                     _stub->_primary_address_str,
                     _stub->_bulk_load_downloading_count.load());
     _bulk_load_start_time_ms = dsn_now_ms();
-    _stub->_counter_bulk_load_downloading_count->increment();
+    METRIC_VAR_INCREMENT(bulk_load_downloading_count);
 
     // create local bulk load dir
     if (!utils::filesystem::directory_exists(_replica->_dir)) {
@@ -531,13 +573,13 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
             _download_status.store(ec);
         }
         LOG_ERROR_PREFIX("failed to download file({}), error = {}", f_meta.name, ec.to_string());
-        _stub->_counter_bulk_load_download_file_fail_count->increment();
+        METRIC_VAR_INCREMENT(bulk_load_download_file_failed_count);
         return;
     }
     // download file succeed, update progress
     update_bulk_load_download_progress(f_size, f_meta.name);
-    _stub->_counter_bulk_load_download_file_succ_count->increment();
-    _stub->_counter_bulk_load_download_file_size->add(f_size);
+    METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count);
+    METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size);
 
     // download next file
     if (file_index + 1 < _metadata.files.size()) {
@@ -646,7 +688,7 @@ void replica_bulk_loader::check_download_finish()
 void replica_bulk_loader::start_ingestion()
 {
     _status = bulk_load_status::BLS_INGESTING;
-    _stub->_counter_bulk_load_ingestion_count->increment();
+    METRIC_VAR_INCREMENT(bulk_load_ingesting_count);
     if (status() == partition_status::PS_PRIMARY) {
         _replica->_primary_states.ingestion_is_empty_prepare_sent = false;
     }
@@ -679,7 +721,7 @@ void replica_bulk_loader::handle_bulk_load_succeed()
 
     _replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);
     _status = bulk_load_status::BLS_SUCCEED;
-    _stub->_counter_bulk_load_succeed_count->increment();
+    METRIC_VAR_INCREMENT(bulk_load_successful_count);
 
     // send an empty prepare again to gurantee that learner should learn from checkpoint
     if (status() == partition_status::PS_PRIMARY) {
diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h
index d4e4974ac..fcfcca589 100644
--- a/src/replica/bulk_load/replica_bulk_loader.h
+++ b/src/replica/bulk_load/replica_bulk_loader.h
@@ -31,6 +31,7 @@
 #include "runtime/api_layer1.h"
 #include "runtime/task/task.h"
 #include "utils/error_code.h"
+#include "utils/metrics.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
@@ -194,6 +195,14 @@ private:
     task_ptr _download_task;
     // Used for perf-counter
     uint64_t _bulk_load_start_time_ms{0};
+
+    METRIC_VAR_DECLARE_counter(bulk_load_downloading_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_ingesting_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_successful_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_failed_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_download_file_successful_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_download_file_failed_count);
+    METRIC_VAR_DECLARE_counter(bulk_load_download_file_bytes);
 };
 
 } // namespace replication
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 2fab2b47b..262ba2b73 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -191,6 +191,21 @@ METRIC_DEFINE_counter(server,
                       dsn::metric_unit::kRequests,
                       "The number of busy write requests");
 
+METRIC_DEFINE_gauge_int64(server,
+                          bulk_load_running_count,
+                          dsn::metric_unit::kBulkLoads,
+                          "The number of current running bulk loads");
+
+METRIC_DEFINE_gauge_int64(server,
+                          bulk_load_ingestion_max_duration_ms,
+                          dsn::metric_unit::kMilliSeconds,
+                          "The max duration of ingestions for bulk loads");
+
+METRIC_DEFINE_gauge_int64(server,
+                          bulk_load_max_duration_ms,
+                          dsn::metric_unit::kMilliSeconds,
+                          "The max duration of bulk loads");
+
 namespace dsn {
 namespace replication {
 
@@ -313,7 +328,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
       METRIC_VAR_INIT_server(read_failed_requests),
       METRIC_VAR_INIT_server(write_failed_requests),
       METRIC_VAR_INIT_server(read_busy_requests),
-      METRIC_VAR_INIT_server(write_busy_requests)
+      METRIC_VAR_INIT_server(write_busy_requests),
+      METRIC_VAR_INIT_server(bulk_load_running_count),
+      METRIC_VAR_INIT_server(bulk_load_ingestion_max_duration_ms),
+      METRIC_VAR_INIT_server(bulk_load_max_duration_ms)
 {
 #ifdef DSN_ENABLE_GPERF
     _is_releasing_memory = false;
@@ -331,52 +349,6 @@ replica_stub::~replica_stub(void) { close(); }
 
 void replica_stub::install_perf_counters()
 {
-    // <- Bulk Load Metrics ->
-
-    _counter_bulk_load_running_count.init_app_counter("eon.replica_stub",
-                                                      "bulk.load.running.count",
-                                                      COUNTER_TYPE_VOLATILE_NUMBER,
-                                                      "current bulk load running count");
-    _counter_bulk_load_downloading_count.init_app_counter("eon.replica_stub",
-                                                          "bulk.load.downloading.count",
-                                                          COUNTER_TYPE_VOLATILE_NUMBER,
-                                                          "current bulk load downloading count");
-    _counter_bulk_load_ingestion_count.init_app_counter("eon.replica_stub",
-                                                        "bulk.load.ingestion.count",
-                                                        COUNTER_TYPE_VOLATILE_NUMBER,
-                                                        "current bulk load ingestion count");
-    _counter_bulk_load_succeed_count.init_app_counter("eon.replica_stub",
-                                                      "bulk.load.succeed.count",
-                                                      COUNTER_TYPE_VOLATILE_NUMBER,
-                                                      "current bulk load succeed count");
-    _counter_bulk_load_failed_count.init_app_counter("eon.replica_stub",
-                                                     "bulk.load.failed.count",
-                                                     COUNTER_TYPE_VOLATILE_NUMBER,
-                                                     "current bulk load failed count");
-    _counter_bulk_load_download_file_succ_count.init_app_counter(
-        "eon.replica_stub",
-        "bulk.load.download.file.success.count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "bulk load recent download file success count");
-    _counter_bulk_load_download_file_fail_count.init_app_counter(
-        "eon.replica_stub",
-        "bulk.load.download.file.fail.count",
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "bulk load recent download file failed count");
-    _counter_bulk_load_download_file_size.init_app_counter("eon.replica_stub",
-                                                           "bulk.load.download.file.size",
-                                                           COUNTER_TYPE_VOLATILE_NUMBER,
-                                                           "bulk load recent download file size");
-    _counter_bulk_load_max_ingestion_time_ms.init_app_counter(
-        "eon.replica_stub",
-        "bulk.load.max.ingestion.duration.time.ms",
-        COUNTER_TYPE_NUMBER,
-        "bulk load max ingestion duration time(ms)");
-    _counter_bulk_load_max_duration_time_ms.init_app_counter("eon.replica_stub",
-                                                             "bulk.load.max.duration.time.ms",
-                                                             COUNTER_TYPE_NUMBER,
-                                                             "bulk load max duration time(ms)");
-
     // <- Partition split Metrics ->
 
     _counter_replicas_splitting_count.init_app_counter("eon.replica_stub",
@@ -1852,9 +1824,9 @@ void replica_stub::on_gc()
     METRIC_VAR_SET(learning_replicas, learning_count);
     METRIC_VAR_SET(learning_replicas_max_duration_ms, learning_max_duration_time_ms);
     METRIC_VAR_SET(learning_replicas_max_copy_file_bytes, learning_max_copy_file_size);
-    _counter_bulk_load_running_count->set(bulk_load_running_count);
-    _counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms);
-    _counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms);
+    METRIC_VAR_SET(bulk_load_running_count, bulk_load_running_count);
+    METRIC_VAR_SET(bulk_load_ingestion_max_duration_ms, bulk_load_max_ingestion_time_ms);
+    METRIC_VAR_SET(bulk_load_max_duration_ms, bulk_load_max_duration_time_ms);
     _counter_replicas_splitting_count->set(splitting_count);
     _counter_replicas_splitting_max_duration_time_ms->set(splitting_max_duration_time_ms);
     _counter_replicas_splitting_max_async_learn_time_ms->set(splitting_max_async_learn_time_ms);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index dac2f0572..fa79b0175 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -530,17 +530,9 @@ private:
     METRIC_VAR_DECLARE_counter(read_busy_requests);
     METRIC_VAR_DECLARE_counter(write_busy_requests);
 
-    // <- Bulk load Metrics ->
-    perf_counter_wrapper _counter_bulk_load_running_count;
-    perf_counter_wrapper _counter_bulk_load_downloading_count;
-    perf_counter_wrapper _counter_bulk_load_ingestion_count;
-    perf_counter_wrapper _counter_bulk_load_succeed_count;
-    perf_counter_wrapper _counter_bulk_load_failed_count;
-    perf_counter_wrapper _counter_bulk_load_download_file_succ_count;
-    perf_counter_wrapper _counter_bulk_load_download_file_fail_count;
-    perf_counter_wrapper _counter_bulk_load_download_file_size;
-    perf_counter_wrapper _counter_bulk_load_max_ingestion_time_ms;
-    perf_counter_wrapper _counter_bulk_load_max_duration_time_ms;
+    METRIC_VAR_DECLARE_gauge_int64(bulk_load_running_count);
+    METRIC_VAR_DECLARE_gauge_int64(bulk_load_ingestion_max_duration_ms);
+    METRIC_VAR_DECLARE_gauge_int64(bulk_load_max_duration_ms);
 
     // <- Partition split Metrics ->
     perf_counter_wrapper _counter_replicas_splitting_count;
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 78c067ea3..566db835d 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -686,6 +686,7 @@ enum class metric_unit : size_t
     kResets,
     kBackups,
     kFileUploads,
+    kBulkLoads,
     kInvalidUnit,
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org