You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/25 10:55:02 UTC

[incubator-pegasus] 03/28: feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit c2ff93acaf9f510355e71f8473fa9b107ed757be
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Mar 7 14:32:56 2023 +0800

    feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
    
    This PR is to migrate replica-level metrics of `pegasus_server_impl` to new framework
    (https://github.com/apache/incubator-pegasus/issues/1333). Since there are many replica-level
    metrics in `pegasus_server_impl`, this PR is part 1 for this migration and other metrics (all
    of which are gauges) will be migrated in the later PRs.
---
 src/server/pegasus_server_impl.cpp           | 250 ++++++++++++++-------------
 src/server/pegasus_server_impl.h             |  32 ++--
 src/server/pegasus_server_impl_init.cpp      | 126 +++++++-------
 src/server/pegasus_write_service_impl.h      |   5 +-
 src/server/rocksdb_wrapper.cpp               |   6 +-
 src/server/rocksdb_wrapper.h                 |   2 +-
 src/server/test/pegasus_server_impl_test.cpp |   4 +-
 src/utils/metrics.h                          |  16 +-
 src/utils/test/metrics_test.cpp              |   2 -
 src/utils/time_utils.h                       |   2 +-
 10 files changed, 242 insertions(+), 203 deletions(-)

diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index e6e9d48ed..ab6c4dc1c 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -322,13 +322,50 @@ int pegasus_server_impl::on_batched_write_requests(int64_t decree,
     return _server_write->on_batched_write_requests(requests, count, decree, timestamp);
 }
 
+// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be declared as static or
+// with anonymous namespace.
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const dsn::blob &hash_key,
+                                           const dsn::blob &sort_key) const
+{
+    LOG_ERROR_PREFIX("rocksdb data expired for {} from {}: hash_key = \"{}\", sort_key = \"{}\"",
+                     op,
+                     addr,
+                     pegasus::utils::c_escape_string(hash_key),
+                     pegasus::utils::c_escape_string(sort_key));
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const dsn::blob &key) const
+{
+    dsn::blob hash_key, sort_key;
+    pegasus_restore_key(key, hash_key, sort_key);
+    log_expired_data(op, addr, hash_key, sort_key);
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const rocksdb::Slice &key) const
+{
+    dsn::blob raw_key(key.data(), 0, key.size());
+    log_expired_data(op, addr, raw_key);
+}
+
+#define LOG_EXPIRED_DATA_IF_VERBOSE(...)                                                           \
+    do {                                                                                           \
+        if (dsn_unlikely(FLAGS_rocksdb_verbose_log)) {                                             \
+            log_expired_data(__FUNCTION__, rpc.remote_address(), ##__VA_ARGS__);                   \
+        }                                                                                          \
+    } while (0)
+
 void pegasus_server_impl::on_get(get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_get_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(get_requests);
 
-    const auto &key = rpc.request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -345,21 +382,20 @@ void pegasus_server_impl::on_get(get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(get_latency_ns);
+
+    const auto &key = rpc.request();
     rocksdb::Slice skey(key.data(), key.length());
     std::string value;
     rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value);
 
     if (status.ok()) {
         if (check_if_record_expired(utils::epoch_now(), value)) {
-            _pfc_recent_expire_count->increment();
-            if (FLAGS_rocksdb_verbose_log) {
-                LOG_ERROR_PREFIX("rocksdb data expired for get from {}", rpc.remote_address());
-            }
+            METRIC_VAR_INCREMENT(read_expired_values);
+            LOG_EXPIRED_DATA_IF_VERBOSE(key);
             status = rocksdb::Status::NotFound();
         }
-    }
-
-    if (!status.ok()) {
+    } else {
         if (FLAGS_rocksdb_verbose_log) {
             ::dsn::blob hash_key, sort_key;
             pegasus_restore_key(key, hash_key, sort_key);
@@ -382,7 +418,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     usleep(10 * 1000);
 #endif
 
-    uint64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(get_latency_ns);
     if (is_get_abnormal(time_used, value.size())) {
         ::dsn::blob hash_key, sort_key;
         pegasus_restore_key(key, hash_key, sort_key);
@@ -395,7 +431,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
                            status.ToString(),
                            value.size(),
                            time_used);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
     resp.error = status.code();
@@ -404,17 +440,14 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     }
 
     _cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
-    _pfc_get_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_multi_get_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(multi_get_requests);
 
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -426,6 +459,10 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     if (!is_filter_type_supported(request.sort_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for multi_get from {}: sort key filter type {} not supported",
@@ -433,7 +470,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
         return;
     }
 
@@ -520,8 +556,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             }
             resp.error = rocksdb::Status::kOk;
             _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-            _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
-
             return;
         }
 
@@ -724,7 +758,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         for (int i = 0; i < keys.size(); i++) {
             rocksdb::Status &status = statuses[i];
             std::string &value = values[i];
-            // print log
             if (!status.ok()) {
                 if (FLAGS_rocksdb_verbose_log) {
                     LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", "
@@ -738,41 +771,38 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                                      rpc.remote_address(),
                                      status.ToString());
                 }
-            }
-            // check ttl
-            if (status.ok()) {
-                uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
-                if (expire_ts > 0 && expire_ts <= epoch_now) {
-                    expire_count++;
-                    if (FLAGS_rocksdb_verbose_log) {
-                        LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}",
-                                         rpc.remote_address());
-                    }
-                    status = rocksdb::Status::NotFound();
-                }
-            }
-            // extract value
-            if (status.ok()) {
-                // check if exceed limit
-                if (count >= max_kv_count || size >= max_kv_size) {
-                    exceed_limit = true;
-                    break;
-                }
-                ::dsn::apps::key_value kv;
-                kv.key = request.sort_keys[i];
-                if (!request.no_value) {
-                    pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+
+                if (status.IsNotFound()) {
+                    continue;
                 }
-                count++;
-                size += kv.key.length() + kv.value.length();
-                resp.kvs.emplace_back(std::move(kv));
-            }
-            // if error occurred
-            if (!status.ok() && !status.IsNotFound()) {
+
                 error_occurred = true;
                 final_status = status;
                 break;
             }
+
+            // check ttl
+            if (check_if_record_expired(epoch_now, value)) {
+                expire_count++;
+                LOG_EXPIRED_DATA_IF_VERBOSE(request.hash_key, request.sort_keys[i]);
+                status = rocksdb::Status::NotFound();
+                continue;
+            }
+
+            // check if exceed limit
+            if (dsn_unlikely(count >= max_kv_count || size >= max_kv_size)) {
+                exceed_limit = true;
+                break;
+            }
+
+            ::dsn::apps::key_value kv;
+            kv.key = request.sort_keys[i];
+            if (!request.no_value) {
+                pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+            }
+            count++;
+            size += kv.key.length() + kv.value.length();
+            resp.kvs.emplace_back(std::move(kv));
         }
 
         if (error_occurred) {
@@ -790,7 +820,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     usleep(10 * 1000);
 #endif
 
-    uint64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(multi_get_latency_ns);
     if (is_multi_get_abnormal(time_used, size, iteration_count)) {
         LOG_WARNING_PREFIX(
             "rocksdb abnormal multi_get from {}: hash_key = {}, "
@@ -816,25 +846,20 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             expire_count,
             filter_count,
             time_used);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
-    if (filter_count > 0) {
-        _pfc_recent_filter_count->add(filter_count);
-    }
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+    METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
 
     _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-    _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_batch_get_qps->increment();
-    int64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(batch_get_requests);
 
     auto &response = rpc.response();
     response.app_id = _gpid.get_app_id();
@@ -847,13 +872,14 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns);
+
     const auto &request = rpc.request();
     if (request.keys.empty()) {
         response.error = rocksdb::Status::kInvalidArgument;
         LOG_ERROR_PREFIX("Invalid argument for batch_get from {}: 'keys' field in request is empty",
                          rpc.remote_address().to_string());
         _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
-        _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
         return;
     }
 
@@ -872,6 +898,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
     bool error_occurred = false;
     int64_t total_data_size = 0;
     uint32_t epoch_now = pegasus::utils::epoch_now();
+    uint64_t expire_count = 0;
+
     std::vector<std::string> values;
     std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
     response.data.reserve(request.keys.size());
@@ -887,13 +915,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
 
         if (dsn_likely(status.ok())) {
             if (check_if_record_expired(epoch_now, value)) {
-                if (FLAGS_rocksdb_verbose_log) {
-                    LOG_ERROR_PREFIX(
-                        "rocksdb data expired for batch_get from {}, hash_key = {}, sort_key = {}",
-                        rpc.remote_address().to_string(),
-                        pegasus::utils::c_escape_string(hash_key),
-                        pegasus::utils::c_escape_string(sort_key));
-                }
+                ++expire_count;
+                LOG_EXPIRED_DATA_IF_VERBOSE(hash_key, sort_key);
                 continue;
             }
 
@@ -931,7 +954,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
         response.error = rocksdb::Status::kOk;
     }
 
-    int64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(batch_get_latency_ns);
     if (is_batch_get_abnormal(time_used, total_data_size, request.keys.size())) {
         LOG_WARNING_PREFIX(
             "rocksdb abnormal batch_get from {}: total data size = {}, row count = {}, "
@@ -940,33 +963,36 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
             total_data_size,
             request.keys.size(),
             time_used / 1000);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+
     _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
-    _pfc_batch_get_latency->set(time_used);
 }
 
 void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
 {
-    CHECK(_is_open, "");
+    CHECK_TRUE(_is_open);
 
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    METRIC_VAR_INCREMENT(scan_requests);
 
-    const auto &hash_key = rpc.request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
+
     if (!_read_size_throttling_controller->available()) {
         rpc.error() = dsn::ERR_BUSY;
         _counter_recent_read_throttling_reject_count->increment();
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
     // scan
     ::dsn::blob start_key, stop_key;
+    const auto &hash_key = rpc.request();
     pegasus_generate_key(start_key, hash_key, ::dsn::blob());
     pegasus_generate_next_blob(stop_key, hash_key);
     rocksdb::Slice start(start_key.data(), start_key.length());
@@ -988,18 +1014,14 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
 
         if (check_if_record_expired(epoch_now, it->value())) {
             expire_count++;
-            if (FLAGS_rocksdb_verbose_log) {
-                LOG_ERROR_PREFIX("rocksdb data expired for sortkey_count from {}",
-                                 rpc.remote_address());
-            }
+            LOG_EXPIRED_DATA_IF_VERBOSE(it->key());
         } else {
             resp.count++;
         }
         it->Next();
     }
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
+
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
 
     resp.error = it->status().code();
     if (!it->status().ok()) {
@@ -1025,7 +1047,6 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
     }
 
     _cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_ttl(ttl_rpc rpc)
@@ -1053,7 +1074,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
     if (status.ok()) {
         expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
         if (check_if_ts_expired(now_ts, expire_ts)) {
-            _pfc_recent_expire_count->increment();
+            METRIC_VAR_INCREMENT(read_expired_values);
             if (FLAGS_rocksdb_verbose_log) {
                 LOG_ERROR_PREFIX("rocksdb data expired for ttl from {}", rpc.remote_address());
             }
@@ -1093,12 +1114,10 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
 
 void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(scan_requests);
 
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -1110,6 +1129,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     if (!is_filter_type_supported(request.hash_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for get_scanner from {}: hash key filter type {} not supported",
@@ -1117,10 +1140,9 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
             request.hash_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
+
     if (!is_filter_type_supported(request.sort_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for get_scanner from {}: sort key filter type {} not supported",
@@ -1128,8 +1150,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
             request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
 
@@ -1185,8 +1205,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         }
         resp.error = rocksdb::Status::kOk;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
 
@@ -1339,24 +1357,18 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
     }
 
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
-    if (filter_count > 0) {
-        _pfc_recent_filter_count->add(filter_count);
-    }
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+    METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
 
     _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_scan(scan_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(scan_requests);
+
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -1368,6 +1380,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
     if (context) {
         rocksdb::Iterator *it = context->iterator.get();
@@ -1490,18 +1506,14 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
             resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
         }
 
-        if (expire_count > 0) {
-            _pfc_recent_expire_count->add(expire_count);
-        }
-        if (filter_count > 0) {
-            _pfc_recent_filter_count->add(filter_count);
-        }
+        METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+        METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
+
     } else {
         resp.error = rocksdb::Status::Code::kNotFound;
     }
 
     _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index d156acdce..7203d0ce2 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -446,6 +446,15 @@ private:
 
     dsn::replication::manual_compaction_status::type query_compact_status() const override;
 
+    // Log expired keys for verbose mode.
+    void log_expired_data(const char *op,
+                          const dsn::rpc_address &addr,
+                          const dsn::blob &hash_key,
+                          const dsn::blob &sort_key) const;
+    void log_expired_data(const char *op, const dsn::rpc_address &addr, const dsn::blob &key) const;
+    void
+    log_expired_data(const char *op, const dsn::rpc_address &addr, const rocksdb::Slice &key) const;
+
 private:
     static const std::chrono::seconds kServerStatUpdateTimeSec;
     static const std::string COMPRESSION_HEADER;
@@ -517,20 +526,19 @@ private:
 
     std::shared_ptr<throttling_controller> _read_size_throttling_controller;
 
-    // perf counters
-    ::dsn::perf_counter_wrapper _pfc_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_batch_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_scan_qps;
+    METRIC_VAR_DECLARE_counter(get_requests);
+    METRIC_VAR_DECLARE_counter(multi_get_requests);
+    METRIC_VAR_DECLARE_counter(batch_get_requests);
+    METRIC_VAR_DECLARE_counter(scan_requests);
 
-    ::dsn::perf_counter_wrapper _pfc_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_multi_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_batch_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_scan_latency;
+    METRIC_VAR_DECLARE_percentile_int64(get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns);
 
-    ::dsn::perf_counter_wrapper _pfc_recent_expire_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_filter_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_abnormal_count;
+    METRIC_VAR_DECLARE_counter(read_expired_values);
+    METRIC_VAR_DECLARE_counter(read_filtered_values);
+    METRIC_VAR_DECLARE_counter(abnormal_read_requests);
 
     // rocksdb internal statistics
     // server level
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 896f8ab94..27aebdfab 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -62,6 +62,61 @@ class replica;
 } // namespace replication
 } // namespace dsn
 
+METRIC_DEFINE_counter(replica,
+                      get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      multi_get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      batch_get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      scan_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of SCAN requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               multi_get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               batch_get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               scan_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of SCAN requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      read_expired_values,
+                      dsn::metric_unit::kValues,
+                      "The number of expired values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      read_filtered_values,
+                      dsn::metric_unit::kValues,
+                      "The number of filtered values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      abnormal_read_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of abnormal read requests for each replica");
+
 namespace pegasus {
 namespace server {
 
@@ -403,7 +458,18 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
       _last_durable_decree(0),
       _is_checkpointing(false),
       _manual_compact_svc(this),
-      _partition_version(0)
+      _partition_version(0),
+      METRIC_VAR_INIT_replica(get_requests),
+      METRIC_VAR_INIT_replica(multi_get_requests),
+      METRIC_VAR_INIT_replica(batch_get_requests),
+      METRIC_VAR_INIT_replica(scan_requests),
+      METRIC_VAR_INIT_replica(get_latency_ns),
+      METRIC_VAR_INIT_replica(multi_get_latency_ns),
+      METRIC_VAR_INIT_replica(batch_get_latency_ns),
+      METRIC_VAR_INIT_replica(scan_latency_ns),
+      METRIC_VAR_INIT_replica(read_expired_values),
+      METRIC_VAR_INIT_replica(read_filtered_values),
+      METRIC_VAR_INIT_replica(abnormal_read_requests)
 {
     _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
     _gpid = get_gpid();
@@ -610,64 +676,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
     char name[256];
 
     // register the perf counters
-    snprintf(name, 255, "get_qps@%s", str_gpid.c_str());
-    _pfc_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET request");
-
-    snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str());
-    _pfc_multi_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");
-
-    snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str());
-    _pfc_batch_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of BATCH_GET request");
-
-    snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
-    _pfc_scan_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
-
-    snprintf(name, 255, "get_latency@%s", str_gpid.c_str());
-    _pfc_get_latency.init_app_counter("app.pegasus",
-                                      name,
-                                      COUNTER_TYPE_NUMBER_PERCENTILES,
-                                      "statistic the latency of GET request");
-
-    snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str());
-    _pfc_multi_get_latency.init_app_counter("app.pegasus",
-                                            name,
-                                            COUNTER_TYPE_NUMBER_PERCENTILES,
-                                            "statistic the latency of MULTI_GET request");
-
-    snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str());
-    _pfc_batch_get_latency.init_app_counter("app.pegasus",
-                                            name,
-                                            COUNTER_TYPE_NUMBER_PERCENTILES,
-                                            "statistic the latency of BATCH_GET request");
-
-    snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
-    _pfc_scan_latency.init_app_counter("app.pegasus",
-                                       name,
-                                       COUNTER_TYPE_NUMBER_PERCENTILES,
-                                       "statistic the latency of SCAN request");
-
-    snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str());
-    _pfc_recent_expire_count.init_app_counter("app.pegasus",
-                                              name,
-                                              COUNTER_TYPE_VOLATILE_NUMBER,
-                                              "statistic the recent expired value read count");
-
-    snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str());
-    _pfc_recent_filter_count.init_app_counter("app.pegasus",
-                                              name,
-                                              COUNTER_TYPE_VOLATILE_NUMBER,
-                                              "statistic the recent filtered value read count");
-
-    snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str());
-    _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
-                                                name,
-                                                COUNTER_TYPE_VOLATILE_NUMBER,
-                                                "statistic the recent abnormal read count");
-
     snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
     _pfc_rdb_sst_count.init_app_counter(
         "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index b75b379a3..344c89560 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -89,8 +89,7 @@ public:
     explicit impl(pegasus_server_impl *server)
         : replica_base(server),
           _primary_address(server->_primary_address),
-          _pegasus_data_version(server->_pegasus_data_version),
-          _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+          _pegasus_data_version(server->_pegasus_data_version)
     {
         _rocksdb_wrapper = std::make_unique<rocksdb_wrapper>(server);
     }
@@ -689,8 +688,6 @@ private:
     const std::string _primary_address;
     const uint32_t _pegasus_data_version;
 
-    ::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
-
     std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
 
     // for setting update_response.error after committed.
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 64d37d9be..08fe8aeca 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -39,6 +39,8 @@
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 
+METRIC_DECLARE_counter(read_expired_values);
+
 namespace pegasus {
 namespace server {
 
@@ -54,7 +56,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
       _rd_opts(server->_data_cf_rd_opts),
       _meta_cf(server->_meta_cf),
       _pegasus_data_version(server->_pegasus_data_version),
-      _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+      METRIC_VAR_INIT_replica(read_expired_values),
       _default_ttl(0)
 {
     _write_batch = std::make_unique<rocksdb::WriteBatch>();
@@ -76,7 +78,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
         ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
         if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
             ctx->expired = true;
-            _pfc_recent_expire_count->increment();
+            METRIC_VAR_INCREMENT(read_expired_values);
         }
         return rocksdb::Status::kOk;
     } else if (s.IsNotFound()) {
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index e3c713512..fef30a629 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -87,7 +87,7 @@ private:
     rocksdb::ColumnFamilyHandle *_meta_cf;
 
     const uint32_t _pegasus_data_version;
-    dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+    METRIC_VAR_DECLARE_counter(read_expired_values);
     volatile uint32_t _default_ttl;
 
     friend class rocksdb_wrapper_test;
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index 1363d8054..075193fef 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -77,7 +77,7 @@ public:
             _server->update_app_envs(envs);
 
             // do on_get/on_multi_get operation,
-            long before_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+            auto before_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
             if (!test.is_multi_get) {
                 get_rpc rpc(std::make_unique<dsn::blob>(test_key), dsn::apps::RPC_RRDB_RRDB_GET);
                 _server->on_get(rpc);
@@ -90,7 +90,7 @@ public:
                                   dsn::apps::RPC_RRDB_RRDB_MULTI_GET);
                 _server->on_multi_get(rpc);
             }
-            long after_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+            auto after_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
 
             ASSERT_EQ(before_count + test.expect_perf_counter_incr, after_count);
         }
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 2d6da6c0f..da1d056d8 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -165,7 +165,13 @@ class error_code;
 #define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
 
 // Perform increment-related operations on metrics including gauge and counter.
-#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
+    do {                                                                                           \
+        if (x != 0) {                                                                              \
+            _##name->increment_by(x);                                                              \
+        }                                                                                          \
+    } while (0)
+
 #define METRIC_VAR_INCREMENT(name) _##name->increment()
 
 // Perform set() operations on metrics including gauge and percentile.
@@ -176,10 +182,15 @@ class error_code;
 // such as percentile.
 #define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
 
+// Read the current measurement of the metric.
+#define METRIC_VAR_VALUE(name) _##name->value()
+
 // Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
 #define METRIC_VAR_AUTO_LATENCY(name, ...)                                                         \
     dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
 
+#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
+
 namespace dsn {
 class metric;                  // IWYU pragma: keep
 class metric_entity_prototype; // IWYU pragma: keep
@@ -598,6 +609,7 @@ enum class metric_unit : size_t
     kMilliSeconds,
     kSeconds,
     kRequests,
+    kValues,
     kInvalidUnit,
 };
 
@@ -1409,6 +1421,8 @@ public:
         }
     }
 
+    inline uint64_t duration_ns() const { return _chrono.duration_ns(); }
+
 private:
     percentile_ptr<int64_t> _percentile;
     utils::chronograph _chrono;
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 670af36b7..9388a9a92 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3134,8 +3134,6 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
     EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
 }
 
-#define METRIC_VAR_VALUE(name) _##name->value()
-
 #define TEST_METRIC_VAR_INCREMENT(name)                                                            \
     do {                                                                                           \
         ASSERT_EQ(0, METRIC_VAR_VALUE(name));                                                      \
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index dc8d6e1ee..c3bf30e6c 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -143,7 +143,7 @@ public:
 
     inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
 
-    inline uint64_t duration_ns()
+    inline uint64_t duration_ns() const
     {
         auto now = dsn_now_ns();
         CHECK_GE(now, _start_time_ns);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org