You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/06/12 03:24:48 UTC

[incubator-pegasus] branch migrate-metrics-dev updated: feat(new_metrics): migrate metrics for profiler (#1524)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/migrate-metrics-dev by this push:
     new 49fb8830e feat(new_metrics): migrate metrics for profiler (#1524)
49fb8830e is described below

commit 49fb8830e658900aff02d0a34d9942838d547929
Author: Dan Wang <wa...@apache.org>
AuthorDate: Mon Jun 12 11:24:43 2023 +0800

    feat(new_metrics): migrate metrics for profiler (#1524)
    
    https://github.com/apache/incubator-pegasus/issues/1523
    
    Profiler-level metric entity is introduced and 12 profiler-related metrics are
    migrate to the new framework, including the number of tasks in all queues,
    the number of tasks that have been executed, the number of cancelled tasks,
    the latency it takes for each task to wait in each queue before beginning to be
    executed, the latency it takes for each task to be executed, the latency from
    enqueue point to reply point on the server side for each RPC task, the non-timeout
    latency from call point to enqueue point on the client side for each RPC task,
    the body length of request received or response replied on the server side for
    each RPC task, the accumulative number of dropped RPC tasks on the server
    side, the accumulative number of timeout RPC tasks on the client side.
    
    All these metrics are configurable thus might be NULL. `METRIC_DEFINE_*_NOTNULL`
    macros are introduced to help defining the member functions that check if the
    metric variables are NULL pointers. Some macros are refactored to make it more
    convenient to define the member functions that increment/decrement/set/get the
    value of the metric variables.
---
 src/common/fs_manager.h       |   3 +-
 src/runtime/profiler.cpp      | 478 +++++++++++++++++++++---------------------
 src/runtime/profiler_header.h |  68 +++---
 src/utils/metrics.h           |  84 ++++++--
 4 files changed, 352 insertions(+), 281 deletions(-)

diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 1467a821c..446264969 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -29,11 +29,10 @@
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "utils/autoref_ptr.h"
-#include "utils/error_code.h"
 #include "utils/flags.h"
-#include "utils/string_view.h"
 #include "utils/metrics.h"
 #include "utils/ports.h"
+#include "utils/string_view.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
diff --git a/src/runtime/profiler.cpp b/src/runtime/profiler.cpp
index 4ea1694c2..b506d11ea 100644
--- a/src/runtime/profiler.cpp
+++ b/src/runtime/profiler.cpp
@@ -49,16 +49,16 @@ START<== queue(server) == ENQUEUE <===== net(reply) ======= REPLY <=============
 */
 #include "runtime/profiler.h"
 
-#include <stddef.h>
 #include <algorithm>
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <set>
 #include <string>
+#include <vector>
 
 #include "aio/aio_task.h"
-#include "perf_counter/perf_counter.h"
-#include "perf_counter/perf_counter_wrapper.h"
+#include "fmt/core.h"
 #include "profiler_header.h"
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_message.h"
@@ -67,9 +67,80 @@ START<== queue(server) == ENQUEUE <===== net(reply) ======= REPLY <=============
 #include "runtime/task/task_spec.h"
 #include "utils/config_api.h"
 #include "utils/extensible_object.h"
-#include "utils/fmt_logging.h"
 #include "utils/flags.h"
+#include "utils/fmt_logging.h"
 #include "utils/join_point.h"
+#include "utils/metrics.h"
+#include "utils/string_view.h"
+
+METRIC_DEFINE_entity(profiler);
+
+METRIC_DEFINE_gauge_int64(profiler,
+                          profiler_queued_tasks,
+                          dsn::metric_unit::kTasks,
+                          "The number of tasks in all queues");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_queue_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency it takes for each task to wait in each queue "
+                               "before beginning to be executed");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_execute_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency it takes for each task to be executed");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_executed_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The number of tasks that have been executed");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_cancelled_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The number of cancelled tasks");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency from enqueue point to reply point on the server side "
+                               "for each RPC task");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_request_bytes,
+                               dsn::metric_unit::kBytes,
+                               "The body length of request received on the server side for each "
+                               "RPC task");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_response_bytes,
+                               dsn::metric_unit::kBytes,
+                               "The body length of response replied on the server side for each "
+                               "RPC task");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_dropped_timeout_rpcs,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of dropped RPC tasks on the server side "
+                      "due to timeout");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_client_rpc_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The non-timeout latency from call point to enqueue point on "
+                               "the client side for each RPC task");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_client_timeout_rpcs,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of timeout RPC tasks on the client side");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_aio_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The duration of the whole AIO operation (begin to aio -> "
+                               "executing -> finished -> callback is put into queue)");
 
 namespace dsn {
 struct service_spec;
@@ -86,7 +157,7 @@ DSN_DEFINE_bool(task..default,
 typedef uint64_extension_helper<task_spec_profiler, task> task_ext_for_profiler;
 typedef uint64_extension_helper<task_spec_profiler, message_ex> message_ext_for_profiler;
 
-std::unique_ptr<task_spec_profiler[]> s_spec_profilers;
+std::vector<task_spec_profiler> s_spec_profilers;
 
 int s_task_code_max = 0;
 
@@ -113,9 +184,7 @@ static void profiler_on_task_enqueue(task *caller, task *callee)
 
     task_ext_for_profiler::get(callee) = dsn_now_ns();
     if (callee->delay_milliseconds() == 0) {
-        auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
-        if (ptr != nullptr)
-            ptr->increment();
+        METRIC_INCREMENT(s_spec_profilers[callee_code], profiler_queued_tasks);
     }
 }
 
@@ -127,14 +196,10 @@ static void profiler_on_task_begin(task *this_)
 
     uint64_t &qts = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
-    auto ptr = s_spec_profilers[code].ptr[TASK_QUEUEING_TIME_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - qts);
+    METRIC_SET(s_spec_profilers[code], profiler_queue_latency_ns, now - qts);
     qts = now;
 
-    ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->decrement();
+    METRIC_DECREMENT(s_spec_profilers[code], profiler_queued_tasks);
 }
 
 static void profiler_on_task_end(task *this_)
@@ -144,13 +209,9 @@ static void profiler_on_task_end(task *this_)
 
     uint64_t qts = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
-    auto ptr = s_spec_profilers[code].ptr[TASK_EXEC_TIME_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - qts);
+    METRIC_SET(s_spec_profilers[code], profiler_execute_latency_ns, now - qts);
 
-    ptr = s_spec_profilers[code].ptr[TASK_THROUGHPUT].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_executed_tasks);
 }
 
 static void profiler_on_task_cancelled(task *this_)
@@ -158,9 +219,7 @@ static void profiler_on_task_cancelled(task *this_)
     auto code = this_->spec().code;
     CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
 
-    auto ptr = s_spec_profilers[code].ptr[TASK_CANCELLED].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_cancelled_tasks);
 }
 
 static void profiler_on_task_wait_pre(task *caller, task *callee, uint32_t timeout_ms) {}
@@ -198,14 +257,10 @@ static void profiler_on_aio_enqueue(aio_task *this_)
     uint64_t &ats = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
 
-    auto ptr = s_spec_profilers[code].ptr[AIO_LATENCY_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - ats);
+    METRIC_SET(s_spec_profilers[code], profiler_aio_latency_ns, now - ats);
     ats = now;
 
-    ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_queued_tasks);
 }
 
 // return true means continue, otherwise early terminate with task::set_error_code
@@ -239,23 +294,17 @@ static void profiler_on_rpc_request_enqueue(rpc_request_task *callee)
     task_ext_for_profiler::get(callee) = now;
     message_ext_for_profiler::get(callee->get_request()) = now;
 
-    auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr) {
-        ptr->increment();
-    }
-    ptr = s_spec_profilers[callee_code].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].get();
-    if (ptr != nullptr) {
-        ptr->set(callee->get_request()->header->body_length);
-    }
+    METRIC_INCREMENT(s_spec_profilers[callee_code], profiler_queued_tasks);
+
+    METRIC_SET(s_spec_profilers[callee_code],
+               profiler_server_rpc_request_bytes,
+               callee->get_request()->header->body_length);
 }
 
 static void profile_on_rpc_task_dropped(rpc_request_task *callee)
 {
     auto code = callee->spec().code;
-    auto ptr = s_spec_profilers[code].ptr[RPC_DROPPED_IF_TIMEOUT].get();
-    if (ptr != nullptr) {
-        ptr->increment();
-    }
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_dropped_timeout_rpcs);
 }
 
 static void profiler_on_rpc_create_response(message_ex *req, message_ex *resp)
@@ -283,14 +332,11 @@ static void profiler_on_rpc_reply(task *caller, message_ex *msg)
     CHECK_NOTNULL(spec, "task_spec cannot be null, code = {}", msg->local_rpc_code.code());
     auto code = spec->rpc_paired_code;
     CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
-    auto ptr = s_spec_profilers[code].ptr[RPC_SERVER_LATENCY_NS].get();
-    if (ptr != nullptr) {
-        ptr->set(now - qts);
-    }
-    ptr = s_spec_profilers[code].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].get();
-    if (ptr != nullptr) {
-        ptr->set(msg->header->body_length);
-    }
+
+    METRIC_SET(s_spec_profilers[code], profiler_server_rpc_latency_ns, now - qts);
+
+    METRIC_SET(
+        s_spec_profilers[code], profiler_server_rpc_response_bytes, msg->header->body_length);
 }
 
 static void profiler_on_rpc_response_enqueue(rpc_response_task *resp)
@@ -302,220 +348,176 @@ static void profiler_on_rpc_response_enqueue(rpc_response_task *resp)
     uint64_t now = dsn_now_ns();
 
     if (resp->get_response() != nullptr) {
-        auto ptr = s_spec_profilers[resp_code].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].get();
-        if (ptr != nullptr)
-            ptr->set(now - cts);
+        METRIC_SET(s_spec_profilers[resp_code], profiler_client_rpc_latency_ns, now - cts);
     } else {
-        auto ptr = s_spec_profilers[resp_code].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].get();
-        if (ptr != nullptr)
-            ptr->increment();
+        METRIC_INCREMENT(s_spec_profilers[resp_code], profiler_client_timeout_rpcs);
     }
     cts = now;
 
-    auto ptr = s_spec_profilers[resp_code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[resp_code], profiler_queued_tasks);
 }
 
-void profiler::install(service_spec &)
+namespace {
+
+metric_entity_ptr instantiate_profiler_metric_entity(const std::string &task_name)
 {
-    s_task_code_max = dsn::task_code::max();
-    s_spec_profilers.reset(new task_spec_profiler[s_task_code_max + 1]);
-    task_ext_for_profiler::register_ext();
-    message_ext_for_profiler::register_ext();
+    auto entity_id = fmt::format("task_{}", task_name);
 
-    for (int i = 0; i <= s_task_code_max; i++) {
-        if (i == TASK_CODE_INVALID)
-            continue;
+    return METRIC_ENTITY_profiler.instantiate(entity_id, {{"task_name", task_name}});
+}
+
+} // anonymous namespace
+
+task_spec_profiler::task_spec_profiler(int code)
+    : collect_call_count(false),
+      is_profile(false),
+      call_counts(new std::atomic<int64_t>[ s_task_code_max + 1 ]),
+      _task_name(dsn::task_code(code).to_string()),
+      _profiler_metric_entity(instantiate_profiler_metric_entity(_task_name))
+{
+    const auto &section_name = fmt::format("task.{}", _task_name);
+    auto spec = task_spec::get(code);
+    CHECK_NOTNULL(spec, "spec should be non-null: task_code={}, task_name={}", code, _task_name);
+
+    collect_call_count = dsn_config_get_value_bool(
+        section_name.c_str(),
+        "collect_call_count",
+        FLAGS_collect_call_count,
+        "whether to collect how many time this kind of tasks invoke each of other kinds tasks");
+
+    for (int i = 0; i <= s_task_code_max; ++i) {
+        call_counts[i].store(0);
+    }
+
+    is_profile = dsn_config_get_value_bool(section_name.c_str(),
+                                           "is_profile",
+                                           FLAGS_is_profile,
+                                           "whether to profile this kind of task");
 
-        std::string name(dsn::task_code(i).to_string());
-        std::string section_name = std::string("task.") + name;
-        task_spec *spec = task_spec::get(i);
-        CHECK_NOTNULL(spec, "");
+    if (!is_profile) {
+        return;
+    }
 
-        s_spec_profilers[i].collect_call_count = dsn_config_get_value_bool(
+    if (dsn_config_get_value_bool(
             section_name.c_str(),
-            "collect_call_count",
-            FLAGS_collect_call_count,
-            "whether to collect how many time this kind of tasks invoke each of other kinds tasks");
-        s_spec_profilers[i].call_counts = new std::atomic<int64_t>[ s_task_code_max + 1 ];
-        std::fill(s_spec_profilers[i].call_counts,
-                  s_spec_profilers[i].call_counts + s_task_code_max + 1,
-                  0);
-
-        s_spec_profilers[i].is_profile =
-            dsn_config_get_value_bool(section_name.c_str(),
-                                      "is_profile",
-                                      FLAGS_is_profile,
-                                      "whether to profile this kind of task");
-
-        if (!s_spec_profilers[i].is_profile)
-            continue;
+            "profiler::inqueue",
+            true,
+            "whether to profile the number of this kind of tasks in all queues")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_queued_tasks);
+    }
 
-        if (dsn_config_get_value_bool(
-                section_name.c_str(),
-                "profiler::inqueue",
-                true,
-                "whether to profile the number of this kind of tasks in all queues"))
-            s_spec_profilers[i].ptr[TASK_IN_QUEUE].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".inqueue")).c_str(),
-                COUNTER_TYPE_NUMBER,
-                "task number in all queues");
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::queue",
+                                  true,
+                                  "whether to profile the queuing time of a task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_queue_latency_ns);
+    }
+
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::exec",
+                                  true,
+                                  "whether to profile the executing time of a task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_execute_latency_ns);
+    }
 
+    if (dsn_config_get_value_bool(
+            section_name.c_str(), "profiler::qps", true, "whether to profile the qps of a task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_executed_tasks);
+    }
+
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::cancelled",
+                                  true,
+                                  "whether to profile the cancelled times of a task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_cancelled_tasks);
+    }
+
+    if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_REQUEST) {
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::queue",
+                                      "profiler::latency.server",
                                       true,
-                                      "whether to profile the queuing time of a task"))
-            s_spec_profilers[i].ptr[TASK_QUEUEING_TIME_NS].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".queue(ns)")).c_str(),
-                COUNTER_TYPE_NUMBER_PERCENTILES,
-                "latency due to waiting in the queue");
-
+                                      "whether to profile the server latency of a task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_latency_ns);
+        }
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      "profiler::size.request.server",
+                                      false,
+                                      "whether to profile the size per request")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_request_bytes);
+        }
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::exec",
+                                      "profiler::size.response.server",
+                                      false,
+                                      "whether to profile the size per response")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_response_bytes);
+        }
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      "rpc_request_dropped_before_execution_when_timeout",
+                                      false,
+                                      "whether to profile the number of rpc dropped for timeout")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_dropped_timeout_rpcs);
+        }
+    } else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) {
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      "profiler::latency.client",
                                       true,
-                                      "whether to profile the executing time of a task"))
-            s_spec_profilers[i].ptr[TASK_EXEC_TIME_NS].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".exec(ns)")).c_str(),
-                COUNTER_TYPE_NUMBER_PERCENTILES,
-                "latency due to executing tasks");
-
+                                      "whether to profile the client latency of a task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_client_rpc_latency_ns);
+        }
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::qps",
+                                      "profiler::timeout.qps",
                                       true,
-                                      "whether to profile the qps of a task"))
-            s_spec_profilers[i].ptr[TASK_THROUGHPUT].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".qps")).c_str(),
-                COUNTER_TYPE_RATE,
-                "task numbers per second");
-
+                                      "whether to profile the timeout qps of a task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_client_timeout_rpcs);
+        }
+    } else if (spec->type == dsn_task_type_t::TASK_TYPE_AIO) {
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::cancelled",
+                                      "profiler::latency",
                                       true,
-                                      "whether to profile the cancelled times of a task"))
-            s_spec_profilers[i].ptr[TASK_CANCELLED].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".cancelled")).c_str(),
-                COUNTER_TYPE_NUMBER,
-                "cancelled times of a specific task type");
-
-        if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_REQUEST) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency.server",
-                                          true,
-                                          "whether to profile the server latency of a task")) {
-                s_spec_profilers[i].ptr[RPC_SERVER_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from enqueue point to reply point on the server side for RPC "
-                    "tasks");
-            }
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::size.request.server",
-                                          false,
-                                          "whether to profile the size per request")) {
-                s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".size.request.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "");
-            }
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::size.response.server",
-                                          false,
-                                          "whether to profile the size per response")) {
-                s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".size.response.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "");
-            }
-            if (dsn_config_get_value_bool(
-                    section_name.c_str(),
-                    "rpc_request_dropped_before_execution_when_timeout",
-                    false,
-                    "whether to profile the number of rpc dropped for timeout"))
-                s_spec_profilers[i].ptr[RPC_DROPPED_IF_TIMEOUT].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".rpc.dropped")).c_str(),
-                    COUNTER_TYPE_VOLATILE_NUMBER,
-                    "rpc dropped if queue time exceed client timeout");
-        } else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency.client",
-                                          true,
-                                          "whether to profile the client latency of a task"))
-                s_spec_profilers[i].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency.client(ns)")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from call point to enqueue point on the client side for RPC "
-                    "tasks");
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::timeout.qps",
-                                          true,
-                                          "whether to profile the timeout qps of a task"))
-                s_spec_profilers[i].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".timeout.qps")).c_str(),
-                    COUNTER_TYPE_RATE,
-                    "time-out task numbers per second for RPC tasks");
-        } else if (spec->type == dsn_task_type_t::TASK_TYPE_AIO) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency",
-                                          true,
-                                          "whether to profile the latency of an AIO task"))
-                s_spec_profilers[i].ptr[AIO_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency(ns)")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from call point to enqueue point for AIO tasks");
+                                      "whether to profile the latency of an AIO task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_aio_latency_ns);
         }
+    }
+
+    spec->on_task_create.put_back(profiler_on_task_create, "profiler");
+    spec->on_task_enqueue.put_back(profiler_on_task_enqueue, "profiler");
+    spec->on_task_begin.put_back(profiler_on_task_begin, "profiler");
+    spec->on_task_end.put_back(profiler_on_task_end, "profiler");
+    spec->on_task_cancelled.put_back(profiler_on_task_cancelled, "profiler");
+    spec->on_task_wait_pre.put_back(profiler_on_task_wait_pre, "profiler");
+    spec->on_task_wait_post.put_back(profiler_on_task_wait_post, "profiler");
+    spec->on_task_cancel_post.put_back(profiler_on_task_cancel_post, "profiler");
+    spec->on_aio_call.put_back(profiler_on_aio_call, "profiler");
+    spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler");
+    spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler");
+    spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, "profiler");
+    spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, "profiler");
+    spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, "profiler");
+    spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler");
+    spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, "profiler");
+}
+
+const metric_entity_ptr &task_spec_profiler::profiler_metric_entity() const
+{
+    CHECK_NOTNULL(_profiler_metric_entity,
+                  "profiler metric entity (task_name={}) should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _task_name);
+    return _profiler_metric_entity;
+}
 
-        // we don't use perf_counter_ptr but perf_counter* in ptr[xxx] to avoid unnecessary memory
-        // access cost
-        // we need to add reference so that the counters won't go
-        // release_ref should be done when the profiler exits (which never happens right now so we
-        // omit that for the time being)
-        for (size_t j = 0; j < sizeof(s_spec_profilers[i].ptr) / sizeof(perf_counter *); j++) {
-            if (s_spec_profilers[i].ptr[j].get() != nullptr) {
-                s_spec_profilers[i].ptr[j]->add_ref();
-            }
+void profiler::install(service_spec &)
+{
+    s_task_code_max = dsn::task_code::max();
+    task_ext_for_profiler::register_ext();
+    message_ext_for_profiler::register_ext();
+
+    for (int code = 0; code <= s_task_code_max; ++code) {
+        if (code == TASK_CODE_INVALID) {
+            continue;
         }
 
-        spec->on_task_create.put_back(profiler_on_task_create, "profiler");
-        spec->on_task_enqueue.put_back(profiler_on_task_enqueue, "profiler");
-        spec->on_task_begin.put_back(profiler_on_task_begin, "profiler");
-        spec->on_task_end.put_back(profiler_on_task_end, "profiler");
-        spec->on_task_cancelled.put_back(profiler_on_task_cancelled, "profiler");
-        spec->on_task_wait_pre.put_back(profiler_on_task_wait_pre, "profiler");
-        spec->on_task_wait_post.put_back(profiler_on_task_wait_post, "profiler");
-        spec->on_task_cancel_post.put_back(profiler_on_task_cancel_post, "profiler");
-        spec->on_aio_call.put_back(profiler_on_aio_call, "profiler");
-        spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler");
-        spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler");
-        spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, "profiler");
-        spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, "profiler");
-        spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, "profiler");
-        spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler");
-        spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, "profiler");
+        s_spec_profilers.emplace_back(code);
     }
 }
 
diff --git a/src/runtime/profiler_header.h b/src/runtime/profiler_header.h
index 54b0df335..a32935804 100644
--- a/src/runtime/profiler_header.h
+++ b/src/runtime/profiler_header.h
@@ -25,45 +25,55 @@
  */
 
 #pragma once
+
 #include <iomanip>
-#include "perf_counter/perf_counter_wrapper.h"
+
+#include "utils/autoref_ptr.h"
+#include "utils/metrics.h"
 
 namespace dsn {
 namespace tools {
 
-enum perf_counter_ptr_type
-{
-    TASK_QUEUEING_TIME_NS,
-    TASK_EXEC_TIME_NS,
-    TASK_THROUGHPUT,
-    TASK_CANCELLED,
-    AIO_LATENCY_NS,
-    RPC_SERVER_LATENCY_NS,
-    RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES,
-    RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES,
-    RPC_CLIENT_NON_TIMEOUT_LATENCY_NS,
-    RPC_CLIENT_TIMEOUT_THROUGHPUT,
-    TASK_IN_QUEUE,
-    RPC_DROPPED_IF_TIMEOUT,
-
-    PERF_COUNTER_COUNT,
-    PERF_COUNTER_INVALID
-};
-
 struct task_spec_profiler
 {
-    perf_counter_wrapper ptr[PERF_COUNTER_COUNT];
     bool collect_call_count;
     bool is_profile;
-    std::atomic<int64_t> *call_counts;
+    std::unique_ptr<std::atomic<int64_t>[]> call_counts;
+
+    task_spec_profiler(int code);
+    const metric_entity_ptr &profiler_metric_entity() const;
+
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_queued_tasks)
+    METRIC_DEFINE_DECREMENT_NOTNULL(profiler_queued_tasks)
+    METRIC_DEFINE_SET_NOTNULL(profiler_queue_latency_ns, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_execute_latency_ns, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_executed_tasks)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_cancelled_tasks)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_latency_ns, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_request_bytes, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_response_bytes, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_dropped_timeout_rpcs)
+    METRIC_DEFINE_SET_NOTNULL(profiler_client_rpc_latency_ns, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_client_timeout_rpcs)
+    METRIC_DEFINE_SET_NOTNULL(profiler_aio_latency_ns, int64_t)
 
-    task_spec_profiler()
-    {
-        collect_call_count = false;
-        is_profile = false;
-        call_counts = nullptr;
-        memset((void *)ptr, 0, sizeof(ptr));
-    }
+private:
+    const std::string _task_name;
+    const metric_entity_ptr _profiler_metric_entity;
+
+    METRIC_VAR_DECLARE_gauge_int64(profiler_queued_tasks);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_queue_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_execute_latency_ns);
+    METRIC_VAR_DECLARE_counter(profiler_executed_tasks);
+    METRIC_VAR_DECLARE_counter(profiler_cancelled_tasks);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_request_bytes);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_response_bytes);
+    METRIC_VAR_DECLARE_counter(profiler_dropped_timeout_rpcs);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_client_rpc_latency_ns);
+    METRIC_VAR_DECLARE_counter(profiler_client_timeout_rpcs);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_aio_latency_ns);
 };
+
 } // namespace tools
 } // namespace dsn
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 2ecc4d305..e484e4c1d 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -192,6 +192,7 @@ class error_code;
 #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_queue(name, ...) METRIC_VAR_INIT(name, queue, ##__VA_ARGS__)
+#define METRIC_VAR_ASSIGN_profiler(name, ...) METRIC_VAR_ASSIGN(name, profiler, ##__VA_ARGS__)
 
 // Perform increment_by() operations on gauges and counters.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
@@ -238,37 +239,96 @@ class error_code;
 #define METRIC_VAR_AUTO_COUNT(name, ...)                                                           \
     dsn::auto_count __##name##_auto_count(METRIC_VAR_NAME(name), ##__VA_ARGS__)
 
+// Implement a member function that runs `method` on the metric variable, without any argument.
+#define METRIC_DEFINE_NO_ARG(method, name)                                                         \
+    void METRIC_FUNC_NAME_##method(name)() { METRIC_VAR_##method(name); }
+
+// Implement a member function that runs `method` on the metric variable if NOT NULL,
+// without any argument.
+#define METRIC_DEFINE_NO_ARG_NOTNULL(method, name)                                                 \
+    void METRIC_FUNC_NAME_##method(name)()                                                         \
+    {                                                                                              \
+        if (METRIC_VAR_NAME(name) != nullptr) {                                                    \
+            METRIC_VAR_##method(name);                                                             \
+        }                                                                                          \
+    }
+
+// Implement a member function that runs `method` on the metric variable and return `ret_type`,
+// without any argument.
+#define METRIC_DEFINE_RET_AND_NO_ARG(ret_type, method, name)                                       \
+    ret_type METRIC_FUNC_NAME_##method(name)() { return METRIC_VAR_##method(name); }
+
+// Implement a member function that runs `method` on the metric variable, with an argument.
+#define METRIC_DEFINE_ONE_ARG(method, name, arg_type)                                              \
+    void METRIC_FUNC_NAME_##method(name)(arg_type arg) { METRIC_VAR_##method(name, arg); }
+
+// Implement a member function that runs `method` on the metric variable if NOT NULL,
+// with an argument.
+#define METRIC_DEFINE_ONE_ARG_NOTNULL(method, name, arg_type)                                      \
+    void METRIC_FUNC_NAME_##method(name)(arg_type arg)                                             \
+    {                                                                                              \
+        if (METRIC_VAR_NAME(name) != nullptr) {                                                    \
+            METRIC_VAR_##method(name, arg);                                                        \
+        }                                                                                          \
+    }
+
+// Call the member function of `obj` to run `method` on the metric variable.
+#define METRIC_CALL(obj, method, name, ...) (obj).METRIC_FUNC_NAME_##method(name)(__VA_ARGS__)
+
+// The name of the member function that increments the metric variable by some value.
 #define METRIC_FUNC_NAME_INCREMENT_BY(name) increment_##name##_by
 
-#define METRIC_DEFINE_INCREMENT_BY(name)                                                           \
-    void METRIC_FUNC_NAME_INCREMENT_BY(name)(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
+// Implement a member function that increments the metric variable by some value.
+#define METRIC_DEFINE_INCREMENT_BY(name) METRIC_DEFINE_ONE_ARG(INCREMENT_BY, name, int64_t)
 
 // To be adaptive to self-defined `increment_by` methods, arguments are declared as variadic.
-#define METRIC_INCREMENT_BY(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT_BY(name)(__VA_ARGS__)
+#define METRIC_INCREMENT_BY(obj, name, ...) METRIC_CALL(obj, INCREMENT_BY, name, ##__VA_ARGS__)
 
+// The name of the member function that increments the metric variable by one.
 #define METRIC_FUNC_NAME_INCREMENT(name) increment_##name
 
-#define METRIC_DEFINE_INCREMENT(name)                                                              \
-    void METRIC_FUNC_NAME_INCREMENT(name)() { METRIC_VAR_INCREMENT(name); }
+// Implement a member function that increments the metric variable by one.
+#define METRIC_DEFINE_INCREMENT(name) METRIC_DEFINE_NO_ARG(INCREMENT, name)
+
+// Implement a member function that increments the metric variable by one if NOT NULL.
+#define METRIC_DEFINE_INCREMENT_NOTNULL(name) METRIC_DEFINE_NO_ARG_NOTNULL(INCREMENT, name)
 
 // To be adaptive to self-defined `increment` methods, arguments are declared as variadic.
-#define METRIC_INCREMENT(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT(name)(__VA_ARGS__)
+#define METRIC_INCREMENT(obj, name, ...) METRIC_CALL(obj, INCREMENT, name, ##__VA_ARGS__)
+
+// The name of the member function that decrements the metric variable by one.
+#define METRIC_FUNC_NAME_DECREMENT(name) decrement_##name
 
+// Implement a member function that decrements the metric variable by one.
+#define METRIC_DEFINE_DECREMENT(name) METRIC_DEFINE_NO_ARG(DECREMENT, name)
+
+// Implement a member function that decrements the metric variable by one if NOT NULL.
+#define METRIC_DEFINE_DECREMENT_NOTNULL(name) METRIC_DEFINE_NO_ARG_NOTNULL(DECREMENT, name)
+
+// To be adaptive to self-defined `decrement` methods, arguments are declared as variadic.
+#define METRIC_DECREMENT(obj, name, ...) METRIC_CALL(obj, DECREMENT, name, ##__VA_ARGS__)
+
+// The name of the member function that sets the metric variable with some value.
 #define METRIC_FUNC_NAME_SET(name) set_##name
 
-#define METRIC_DEFINE_SET(name, value_type)                                                        \
-    void METRIC_FUNC_NAME_SET(name)(value_type value) { METRIC_VAR_SET(name, value); }
+// Implement a member function that sets the metric variable with some value.
+#define METRIC_DEFINE_SET(name, value_type) METRIC_DEFINE_ONE_ARG(SET, name, value_type)
+
+// Implement a member function that sets the metric variable with some value if NOT NULL.
+#define METRIC_DEFINE_SET_NOTNULL(name, value_type)                                                \
+    METRIC_DEFINE_ONE_ARG_NOTNULL(SET, name, value_type)
 
 // To be adaptive to self-defined `set` methods, arguments are declared as variadic.
-#define METRIC_SET(obj, name, ...) (obj).METRIC_FUNC_NAME_SET(name)(__VA_ARGS__)
+#define METRIC_SET(obj, name, ...) METRIC_CALL(obj, SET, name, ##__VA_ARGS__)
 
+// The name of the member function that gets the value of the metric variable.
 #define METRIC_FUNC_NAME_VALUE(name) get_##name
 
-#define METRIC_DEFINE_VALUE(name, value_type)                                                      \
-    value_type METRIC_FUNC_NAME_VALUE(name)() { return METRIC_VAR_VALUE(name); }
+// Implement a member function that gets the value of the metric variable.
+#define METRIC_DEFINE_VALUE(name, value_type) METRIC_DEFINE_RET_AND_NO_ARG(value_type, VALUE, name)
 
 // To be adaptive to self-defined `value` methods, arguments are declared as variadic.
-#define METRIC_VALUE(obj, name, ...) (obj).METRIC_FUNC_NAME_VALUE(name)(__VA_ARGS__)
+#define METRIC_VALUE(obj, name, ...) METRIC_CALL(obj, VALUE, name, ##__VA_ARGS__)
 
 namespace dsn {
 class metric;                  // IWYU pragma: keep


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org