You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/03/10 10:56:00 UTC

[incubator-pegasus] branch migrate-metrics-dev updated (648abd505 -> a94256684)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a change to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


 discard 648abd505 feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 2) (#1386)
    omit 16c9766da feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
    omit df20ea5cd feat(new_metrics): migrate replica-level metrics for write service (#1351)
    omit c623f9956 feat(new_metrics): add replica-level metric entity (#1345)
     add d6c15b835 refactor(conf): use DSN_DEFINE_int32 to load int32 type of configs (part 2) (#1346)
     add c37bacd9f build: support Ubuntu 22.04 and clang 14 (#1350)
     add 4c76112a5 refactor(conf): use DSN_DEFINE_uint32 to load uint32 type of configs (#1352)
     add ec905d991 refactor(conf): use DSN_DEFINE_int64 to load int64 type of configs (#1357)
     add 3598aa6cc refactor(conf): use DSN_DEFINE_uint64 to load uint64 type of configs (#1359)
     add 51b57615c refactor(conf): use DSN_DEFINE_double to load double type of configs (#1362)
     add 9b160a935 refactor(conf): use DSN_DEFINE_bool to load bool type of configs (#1363)
     add cd01040d8 feat(security): Use Apache Ranger for access control(1/n) (#1360)
     add 2c6911779 feat(security): Use Apache Ranger for access control(2/n) (#1375)
     add 98e359888 refactor(conf): use DSN_DEFINE_string to load string type of configs (#1371)
     add 0922a9ac8 chore(cmake): fix project load failure by CLion (#1376)
     add 5088b2929 feat(test): add bench type multi set/get (#1373)
     add f628e87a5 fix: fix centos image build (#1356)
     add 6ee7a953f chore(CI): Use Ubuntu 22.04 as the default test platform (#1381)
     add 37434d28d refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
     add 84ce5fb97 feat(Ranger): Compatible with the old ACL and the new ACL (#1379)
     add ca6ee39de feat(Ranger): Support encode and decode Ranger policies (#1378)
     new 18b06cf0e feat(new_metrics): add replica-level metric entity (#1345)
     new 09931e8e1 feat(new_metrics): migrate replica-level metrics for write service (#1351)
     new 3210f71ae feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
     new a94256684 feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 2) (#1386)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (648abd505)
            \
             N -- N -- N   refs/heads/migrate-metrics-dev (a94256684)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/build-push-env-docker.yml        |   3 +-
 .github/workflows/labeler.yml                      |   2 +-
 .github/workflows/lint_and_test_admin-cli.yml      |   8 +-
 .github/workflows/lint_and_test_cpp.yaml           |  79 ++--
 .github/workflows/lint_and_test_go-client.yml      |   4 +-
 .github/workflows/lint_and_test_pegic.yml          |   8 +-
 .github/workflows/regular-build.yml                |  19 +-
 .github/workflows/standardization_lint.yaml        |   8 +-
 .github/workflows/test_java-client.yml             |   2 +-
 .github/workflows/test_scala-client.yml            |   2 +-
 .github/workflows/thirdparty-regular-push.yml      |  16 +-
 .licenserc.yaml                                    |   1 +
 cmake_modules/BaseFunctions.cmake                  |   2 +-
 docker/README.md                                   |   2 +
 docker/pegasus-build-env/centos6/Dockerfile        |   2 +-
 docker/pegasus-build-env/centos7/Dockerfile        |   5 +-
 .../{ubuntu1804 => ubuntu2204}/Dockerfile          |  16 +-
 run.sh                                             |   9 +
 src/block_service/test/fds_service_test.cpp        |   7 +-
 src/client/replication_ddl_client.h                |   3 +-
 src/common/CMakeLists.txt                          |  14 +-
 src/common/json_helper.h                           |  58 ++-
 src/common/replication_common.cpp                  | 133 +-----
 src/common/replication_common.h                    |  29 +-
 src/common/test/config-test.ini                    |   3 -
 src/failure_detector/test/config-test.ini          |   3 -
 .../test/config-whitelist-test-failed.ini          |   3 -
 .../test/config-whitelist-test.ini                 |   3 -
 src/failure_detector/test/failure_detector.cpp     |  13 +-
 src/geo/lib/geo_client.cpp                         |  89 ++--
 src/geo/lib/geo_client.h                           |  11 -
 src/geo/test/geo_test.cpp                          |   7 +-
 src/http/http_server.cpp                           |   5 +-
 src/meta/app_balance_policy.cpp                    |  14 +-
 src/meta/meta_data.cpp                             |  35 +-
 src/meta/meta_http_service.cpp                     |  10 +-
 src/meta/meta_options.cpp                          | 133 +-----
 src/meta/meta_options.h                            |  31 +-
 src/meta/meta_server_failure_detector.cpp          |  23 +-
 src/meta/meta_service.cpp                          |  75 +++-
 src/meta/meta_service.h                            |   2 +
 src/meta/meta_state_service_zookeeper.cpp          |   5 +-
 src/meta/partition_guardian.cpp                    |  19 +-
 src/meta/server_state.cpp                          |  35 +-
 src/meta/test/backup_test.cpp                      |   8 +-
 src/meta/test/config-test.ini                      |   3 -
 src/meta/test/main.cpp                             |  12 +-
 src/meta/test/meta_app_operation_test.cpp          |  12 +-
 src/meta/test/meta_state/config-test.ini           |   3 -
 src/meta/test/meta_test_base.cpp                   |   6 +-
 src/meta/test/server_state_test.cpp                |   6 +-
 src/meta/test/state_sync_test.cpp                  |   8 +-
 src/meta/test/update_configuration_test.cpp        |   6 +-
 src/perf_counter/perf_counter_atomic.cpp           |  73 +++
 src/perf_counter/perf_counter_atomic.h             |  48 +-
 src/perf_counter/test/perf_counter_test.cpp        |  13 +-
 src/remote_cmd/CMakeLists.txt                      |   2 +-
 src/replica/backup/replica_backup_server.cpp       |  12 +-
 src/replica/bulk_load/test/config-test.ini         |   3 -
 src/replica/disk_cleaner.cpp                       |  13 +-
 src/replica/disk_cleaner.h                         |   2 +
 src/replica/duplication/replica_follower.cpp       |   3 +-
 src/replica/replica.cpp                            |   6 +-
 src/replica/replica.h                              |  16 -
 src/replica/replica_2pc.cpp                        |  13 +-
 src/replica/replica_backup.cpp                     |   6 +-
 src/replica/replica_check.cpp                      |   3 +-
 src/replica/replica_config.cpp                     |  25 +-
 src/replica/replica_http_service.cpp               |   2 +-
 src/replica/replica_init.cpp                       | 136 +-----
 src/replica/replica_stub.cpp                       | 257 ++++++++---
 src/replica/replica_stub.h                         |  15 +-
 src/replica/replication_app_base.cpp               |  18 +-
 src/replica/split/test/config-test.ini             |   3 -
 .../storage/simple_kv/simple_kv.server.impl.cpp    |   6 +-
 .../storage/simple_kv/simple_kv.server.impl.h      |   1 -
 src/replica/storage/simple_kv/test/checker.cpp     |   4 +-
 .../simple_kv/test/simple_kv.server.impl.cpp       |   6 +-
 .../storage/simple_kv/test/simple_kv.server.impl.h |   1 -
 src/replica/test/config-test.ini                   |   3 -
 src/replica/test/replica_http_service_test.cpp     |   7 +-
 src/replica/test/replica_test.cpp                  |  23 +-
 src/runtime/CMakeLists.txt                         |   2 +
 src/runtime/env.sim.cpp                            |  19 +-
 src/runtime/env.sim.h                              |   2 -
 src/runtime/profiler.cpp                           |  25 +-
 src/runtime/{security => ranger}/CMakeLists.txt    |  14 +-
 .../ranger/access_type.cpp}                        |  29 +-
 src/{utils/math.h => runtime/ranger/access_type.h} |  31 +-
 src/runtime/ranger/ranger_resource_policy.cpp      |  70 +++
 src/runtime/ranger/ranger_resource_policy.h        |  79 ++++
 .../ranger/ranger_resource_policy_manager.cpp      | 167 +++++++
 .../ranger/ranger_resource_policy_manager.h        |  95 ++++
 src/runtime/rpc/asio_net_provider.cpp              |  12 +-
 src/runtime/rpc/network.cpp                        |  59 +--
 src/runtime/rpc/network.h                          |   2 -
 src/runtime/rpc/network.sim.cpp                    |  25 +-
 src/runtime/rpc/network.sim.h                      |   2 -
 src/runtime/rpc/rpc_engine.cpp                     |   4 +-
 src/runtime/rpc/rpc_message.cpp                    |  23 +-
 src/runtime/rpc/rpc_message.h                      |   3 -
 src/runtime/security/CMakeLists.txt                |   2 +-
 src/runtime/security/access_controller.cpp         |  27 +-
 src/runtime/security/access_controller.h           |  36 +-
 src/runtime/security/replica_access_controller.cpp |   2 +-
 src/runtime/security/replica_access_controller.h   |   2 +-
 src/runtime/service_api_c.cpp                      |  27 +-
 src/runtime/service_engine.cpp                     |  16 +-
 src/runtime/simulator.cpp                          |   6 +-
 src/runtime/test/CMakeLists.txt                    |   1 +
 src/runtime/test/main.cpp                          |   6 +-
 src/runtime/test/netprovider.cpp                   |  39 +-
 .../test/ranger_resource_policy_manager_test.cpp   | 206 +++++++++
 src/runtime/test/ranger_resource_policy_test.cpp   |  80 ++++
 src/runtime/test/service_api_c.cpp                 |   5 +-
 src/runtime/tracer.cpp                             |  12 +-
 src/server/available_detector.cpp                  | 106 ++---
 src/server/available_detector.h                    |   6 -
 src/server/capacity_unit_calculator.cpp            |  43 +-
 src/server/capacity_unit_calculator.h              |   2 -
 src/server/config.ini                              |   1 -
 src/server/info_collector.cpp                      |  54 ++-
 src/server/info_collector.h                        |   5 -
 src/server/pegasus_manual_compact_service.cpp      |  17 +-
 src/server/pegasus_manual_compact_service.h        |   1 -
 src/server/pegasus_server_impl.cpp                 |  64 +--
 src/server/pegasus_server_impl.h                   |  21 +-
 src/server/pegasus_server_impl_init.cpp            | 490 ++++++++++-----------
 src/server/pegasus_server_write.cpp                |   6 +-
 src/server/pegasus_server_write.h                  |   4 +-
 src/server/pegasus_write_service.cpp               |   6 +
 src/server/pegasus_write_service.h                 |   1 -
 src/server/range_read_limiter.h                    |   1 -
 src/server/test/capacity_unit_calculator_test.cpp  |   8 +-
 src/server/test/config.ini                         |   1 -
 src/server/test/manual_compact_service_test.cpp    |  11 +-
 src/server/test/pegasus_server_write_test.cpp      |   2 +-
 .../test/pegasus_write_service_impl_test.cpp       |   2 +-
 src/server/test/pegasus_write_service_test.cpp     |   2 +-
 src/server/test/rocksdb_wrapper_test.cpp           |   2 +-
 src/test/bench_test/benchmark.cpp                  | 193 +++++++-
 src/test/bench_test/benchmark.h                    |   2 +
 src/test/bench_test/config.cpp                     |  34 +-
 src/test/bench_test/config.h                       |   6 -
 src/test/bench_test/config.ini                     |   5 +-
 src/test/bench_test/statistics.cpp                 |   7 +-
 src/test/bench_test/utils.h                        |   4 +-
 .../function_test/recovery_test/test_recovery.cpp  |   2 +-
 src/test/kill_test/data_verifier.cpp               |  70 +--
 src/test/kill_test/data_verifier.h                 |   4 +
 src/test/kill_test/kill_testor.cpp                 |  37 +-
 src/test/kill_test/kill_testor.h                   |   6 -
 src/test/kill_test/killer_handler_shell.cpp        |  23 +-
 src/test/kill_test/killer_handler_shell.h          |   6 +-
 src/test/kill_test/main.cpp                        |   4 +-
 src/test/kill_test/partition_kill_testor.cpp       |   8 +-
 src/test/kill_test/process_kill_testor.cpp         |  99 +++--
 src/test/kill_test/process_kill_testor.h           |   8 -
 src/test/pressure_test/main.cpp                    |  93 ++--
 src/utils/simple_logger.cpp                        |  10 -
 src/utils/simple_logger.h                          |   3 +-
 src/utils/string_conv.h                            |   2 +-
 src/utils/test/logger.cpp                          |   2 +-
 .../distributed_lock_service_zookeeper.cpp         |   5 +-
 src/zookeeper/test/config-test.ini                 |   3 -
 src/zookeeper/zookeeper_session.cpp                |  19 +-
 src/zookeeper/zookeeper_session_mgr.cpp            |  11 +-
 src/zookeeper/zookeeper_session_mgr.h              |   8 -
 thirdparty/CMakeLists.txt                          |   3 +-
 thirdparty/fix_prometheus-cpp_limits.patch         |  10 +
 170 files changed, 2572 insertions(+), 1836 deletions(-)
 copy docker/pegasus-build-env/{ubuntu1804 => ubuntu2204}/Dockerfile (89%)
 create mode 100644 src/perf_counter/perf_counter_atomic.cpp
 copy src/runtime/{security => ranger}/CMakeLists.txt (81%)
 copy src/{utils/sys_exit_hook.h => runtime/ranger/access_type.cpp} (66%)
 copy src/{utils/math.h => runtime/ranger/access_type.h} (50%)
 create mode 100644 src/runtime/ranger/ranger_resource_policy.cpp
 create mode 100644 src/runtime/ranger/ranger_resource_policy.h
 create mode 100644 src/runtime/ranger/ranger_resource_policy_manager.cpp
 create mode 100644 src/runtime/ranger/ranger_resource_policy_manager.h
 create mode 100644 src/runtime/test/ranger_resource_policy_manager_test.cpp
 create mode 100644 src/runtime/test/ranger_resource_policy_test.cpp
 create mode 100644 thirdparty/fix_prometheus-cpp_limits.patch


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 04/04: feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 2) (#1386)

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit a94256684802928e3072bcce5202f4bb5cbba163
Author: Dan Wang <wa...@apache.org>
AuthorDate: Fri Mar 10 17:05:59 2023 +0800

    feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 2) (#1386)
    
    This PR is to migrate replica-level metrics of pegasus_server_impl to new framework,
    2nd part, for #1333.
    
    This PR focuses on migrating all rocksdb-related metrics for each replica, including
    total number and size of sst files, estimated number of keys, memory usage and hit
    rate, write/read amplification, negatives/positives of bloom filters.
---
 src/server/pegasus_server_impl.cpp      | 150 ++++++-----------
 src/server/pegasus_server_impl.h        |  48 +++---
 src/server/pegasus_server_impl_init.cpp | 275 ++++++++++++++++++--------------
 src/utils/metrics.h                     |  12 +-
 4 files changed, 238 insertions(+), 247 deletions(-)

diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 2bbcc6507..1f85aa23b 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -310,6 +310,15 @@ void pegasus_server_impl::log_expired_data(const char *op,
         }                                                                                          \
     } while (0)
 
+#define CHECK_READ_THROTTLING()                                                                    \
+    do {                                                                                           \
+        if (dsn_unlikely(!_read_size_throttling_controller->available())) {                        \
+            rpc.error() = dsn::ERR_BUSY;                                                           \
+            METRIC_VAR_INCREMENT(throttling_rejected_read_requests);                               \
+            return;                                                                                \
+        }                                                                                          \
+    } while (0)
+
 void pegasus_server_impl::on_get(get_rpc rpc)
 {
     CHECK_TRUE(_is_open);
@@ -321,11 +330,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(get_latency_ns);
 
@@ -398,11 +403,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns);
 
@@ -811,11 +812,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
     response.partition_index = _gpid.get_partition_index();
     response.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns);
 
@@ -927,11 +924,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
 
@@ -1004,11 +997,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     rocksdb::Slice skey(key.data(), key.length());
     std::string value;
@@ -1068,11 +1057,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
 
@@ -1319,11 +1304,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
-    if (!_read_size_throttling_controller->available()) {
-        rpc.error() = dsn::ERR_BUSY;
-        _counter_recent_read_throttling_reject_count->increment();
-        return;
-    }
+    CHECK_READ_THROTTLING();
 
     METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
 
@@ -1811,13 +1792,13 @@ void pegasus_server_impl::cancel_background_work(bool wait)
             LOG_ERROR_PREFIX("rmdir {} failed when stop app", data_dir());
             return ::dsn::ERR_FILE_OPERATION_FAILED;
         }
-        _pfc_rdb_sst_count->set(0);
-        _pfc_rdb_sst_size->set(0);
-        _pfc_rdb_block_cache_hit_count->set(0);
-        _pfc_rdb_block_cache_total_count->set(0);
+        METRIC_VAR_SET(rdb_total_sst_files, 0);
+        METRIC_VAR_SET(rdb_total_sst_size_mb, 0);
+        METRIC_VAR_SET(rdb_index_and_filter_blocks_mem_usage_bytes, 0);
+        METRIC_VAR_SET(rdb_memtable_mem_usage_bytes, 0);
+        METRIC_VAR_SET(rdb_block_cache_hit_count, 0);
+        METRIC_VAR_SET(rdb_block_cache_total_count, 0);
         _pfc_rdb_block_cache_mem_usage->set(0);
-        _pfc_rdb_index_and_filter_blocks_mem_usage->set(0);
-        _pfc_rdb_memtable_mem_usage->set(0);
     }
 
     LOG_INFO_PREFIX("close app succeed, clear_state = {}", clear_state ? "true" : "false");
@@ -2379,12 +2360,16 @@ range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
     return range_iteration_state::kNormal;
 }
 
+#define GET_TICKER_COUNT_AND_SET_METRIC(ticker_name, metric_name)                                  \
+    do {                                                                                           \
+        METRIC_VAR_SET(metric_name, _statistics->getTickerCount(rocksdb::ticker_name));            \
+    } while (0)
+
 void pegasus_server_impl::update_replica_rocksdb_statistics()
 {
     std::string str_val;
     uint64_t val = 0;
 
-    // Update _pfc_rdb_sst_count
     for (int i = 0; i < _data_cf_opts.num_levels; ++i) {
         int cur_level_count = 0;
         if (_db->GetProperty(rocksdb::DB::Properties::kNumFilesAtLevelPrefix + std::to_string(i),
@@ -2393,49 +2378,38 @@ void pegasus_server_impl::update_replica_rocksdb_statistics()
             val += cur_level_count;
         }
     }
-    _pfc_rdb_sst_count->set(val);
-    LOG_DEBUG_PREFIX("_pfc_rdb_sst_count: {}", val);
+    METRIC_VAR_SET(rdb_total_sst_files, val);
 
-    // Update _pfc_rdb_sst_size
     if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kTotalSstFilesSize, &str_val) &&
         dsn::buf2uint64(str_val, val)) {
         static uint64_t bytes_per_mb = 1U << 20U;
-        _pfc_rdb_sst_size->set(val / bytes_per_mb);
-        LOG_DEBUG_PREFIX("_pfc_rdb_sst_size: {} bytes", val);
+        METRIC_VAR_SET(rdb_total_sst_size_mb, val / bytes_per_mb);
     }
 
-    // Update _pfc_rdb_write_amplification
     std::map<std::string, std::string> props;
     if (_db->GetMapProperty(_data_cf, "rocksdb.cfstats", &props)) {
         auto write_amplification_iter = props.find("compaction.Sum.WriteAmp");
         auto write_amplification = write_amplification_iter == props.end()
                                        ? 1
                                        : std::stod(write_amplification_iter->second);
-        _pfc_rdb_write_amplification->set(write_amplification);
-        LOG_DEBUG_PREFIX("_pfc_rdb_write_amplification: {}", write_amplification);
+        METRIC_VAR_SET(rdb_write_amplification, write_amplification);
     }
 
-    // Update _pfc_rdb_index_and_filter_blocks_mem_usage
     if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kEstimateTableReadersMem, &str_val) &&
         dsn::buf2uint64(str_val, val)) {
-        _pfc_rdb_index_and_filter_blocks_mem_usage->set(val);
-        LOG_DEBUG_PREFIX("_pfc_rdb_index_and_filter_blocks_mem_usage: {} bytes", val);
+        METRIC_VAR_SET(rdb_index_and_filter_blocks_mem_usage_bytes, val);
     }
 
-    // Update _pfc_rdb_memtable_mem_usage
     if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kCurSizeAllMemTables, &str_val) &&
         dsn::buf2uint64(str_val, val)) {
-        _pfc_rdb_memtable_mem_usage->set(val);
-        LOG_DEBUG_PREFIX("_pfc_rdb_memtable_mem_usage: {} bytes", val);
+        METRIC_VAR_SET(rdb_memtable_mem_usage_bytes, val);
     }
 
-    // Update _pfc_rdb_estimate_num_keys
     // NOTE: for the same n kv pairs, kEstimateNumKeys will be counted n times, you need compaction
     // to remove duplicate
     if (_db->GetProperty(_data_cf, rocksdb::DB::Properties::kEstimateNumKeys, &str_val) &&
         dsn::buf2uint64(str_val, val)) {
-        _pfc_rdb_estimate_num_keys->set(val);
-        LOG_DEBUG_PREFIX("_pfc_rdb_estimate_num_keys: {}", val);
+        METRIC_VAR_SET(rdb_estimated_keys, val);
     }
 
     // the follow stats is related to `read`, so only primary need update it,ignore
@@ -2444,7 +2418,6 @@ void pegasus_server_impl::update_replica_rocksdb_statistics()
         return;
     }
 
-    // Update _pfc_rdb_read_amplification
     if (FLAGS_read_amp_bytes_per_bit > 0) {
         auto estimate_useful_bytes =
             _statistics->getTickerCount(rocksdb::READ_AMP_ESTIMATE_USEFUL_BYTES);
@@ -2452,68 +2425,41 @@ void pegasus_server_impl::update_replica_rocksdb_statistics()
             auto read_amplification =
                 _statistics->getTickerCount(rocksdb::READ_AMP_TOTAL_READ_BYTES) /
                 estimate_useful_bytes;
-            _pfc_rdb_read_amplification->set(read_amplification);
-            LOG_DEBUG_PREFIX("_pfc_rdb_read_amplification: {}", read_amplification);
+            METRIC_VAR_SET(rdb_read_amplification, read_amplification);
         }
     }
 
-    // Update _pfc_rdb_bf_seek_negatives
-    auto bf_seek_negatives = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_USEFUL);
-    _pfc_rdb_bf_seek_negatives->set(bf_seek_negatives);
-    LOG_DEBUG_PREFIX("_pfc_rdb_bf_seek_negatives: {}", bf_seek_negatives);
+    GET_TICKER_COUNT_AND_SET_METRIC(BLOOM_FILTER_PREFIX_USEFUL, rdb_bloom_filter_seek_negatives);
 
-    // Update _pfc_rdb_bf_seek_total
-    auto bf_seek_total = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_CHECKED);
-    _pfc_rdb_bf_seek_total->set(bf_seek_total);
-    LOG_DEBUG_PREFIX("_pfc_rdb_bf_seek_total: {}", bf_seek_total);
+    GET_TICKER_COUNT_AND_SET_METRIC(BLOOM_FILTER_PREFIX_CHECKED, rdb_bloom_filter_seek_total);
 
-    // Update _pfc_rdb_bf_point_positive_true
-    auto bf_point_positive_true =
-        _statistics->getTickerCount(rocksdb::BLOOM_FILTER_FULL_TRUE_POSITIVE);
-    _pfc_rdb_bf_point_positive_true->set(bf_point_positive_true);
-    LOG_DEBUG_PREFIX("_pfc_rdb_bf_point_positive_true: {}", bf_point_positive_true);
+    GET_TICKER_COUNT_AND_SET_METRIC(BLOOM_FILTER_USEFUL, rdb_bloom_filter_point_lookup_negatives);
 
-    // Update _pfc_rdb_bf_point_positive_total
-    auto bf_point_positive_total = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_FULL_POSITIVE);
-    _pfc_rdb_bf_point_positive_total->set(bf_point_positive_total);
-    LOG_DEBUG_PREFIX("_pfc_rdb_bf_point_positive_total: {}", bf_point_positive_total);
+    GET_TICKER_COUNT_AND_SET_METRIC(BLOOM_FILTER_FULL_POSITIVE,
+                                    rdb_bloom_filter_point_lookup_positives);
 
-    // Update _pfc_rdb_bf_point_negatives
-    auto bf_point_negatives = _statistics->getTickerCount(rocksdb::BLOOM_FILTER_USEFUL);
-    _pfc_rdb_bf_point_negatives->set(bf_point_negatives);
-    LOG_DEBUG_PREFIX("_pfc_rdb_bf_point_negatives: {}", bf_point_negatives);
+    GET_TICKER_COUNT_AND_SET_METRIC(BLOOM_FILTER_FULL_TRUE_POSITIVE,
+                                    rdb_bloom_filter_point_lookup_true_positives);
 
-    // Update _pfc_rdb_block_cache_hit_count and _pfc_rdb_block_cache_total_count
     auto block_cache_hit = _statistics->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
-    _pfc_rdb_block_cache_hit_count->set(block_cache_hit);
-    LOG_DEBUG_PREFIX("_pfc_rdb_block_cache_hit_count: {}", block_cache_hit);
+    METRIC_VAR_SET(rdb_block_cache_hit_count, block_cache_hit);
 
     auto block_cache_miss = _statistics->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
     auto block_cache_total = block_cache_hit + block_cache_miss;
-    _pfc_rdb_block_cache_total_count->set(block_cache_total);
-    LOG_DEBUG_PREFIX("_pfc_rdb_block_cache_total_count: {}", block_cache_total);
+    METRIC_VAR_SET(rdb_block_cache_total_count, block_cache_total);
 
-    // update block memtable/l0/l1/l2andup hit rate under block cache up level
     auto memtable_hit_count = _statistics->getTickerCount(rocksdb::MEMTABLE_HIT);
-    _pfc_rdb_memtable_hit_count->set(memtable_hit_count);
-    LOG_DEBUG_PREFIX("_pfc_rdb_memtable_hit_count: {}", memtable_hit_count);
+    METRIC_VAR_SET(rdb_memtable_hit_count, memtable_hit_count);
 
     auto memtable_miss_count = _statistics->getTickerCount(rocksdb::MEMTABLE_MISS);
     auto memtable_total = memtable_hit_count + memtable_miss_count;
-    _pfc_rdb_memtable_total_count->set(memtable_total);
-    LOG_DEBUG_PREFIX("_pfc_rdb_memtable_total_count: {}", memtable_total);
+    METRIC_VAR_SET(rdb_memtable_total_count, memtable_total);
 
-    auto l0_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L0);
-    _pfc_rdb_l0_hit_count->set(l0_hit_count);
-    LOG_DEBUG_PREFIX("_pfc_rdb_l0_hit_count: {}", l0_hit_count);
+    GET_TICKER_COUNT_AND_SET_METRIC(GET_HIT_L0, rdb_l0_hit_count);
 
-    auto l1_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L1);
-    _pfc_rdb_l1_hit_count->set(l1_hit_count);
-    LOG_DEBUG_PREFIX("_pfc_rdb_l1_hit_count: {}", l1_hit_count);
+    GET_TICKER_COUNT_AND_SET_METRIC(GET_HIT_L1, rdb_l1_hit_count);
 
-    auto l2andup_hit_count = _statistics->getTickerCount(rocksdb::GET_HIT_L2_AND_UP);
-    _pfc_rdb_l2andup_hit_count->set(l2andup_hit_count);
-    LOG_DEBUG_PREFIX("_pfc_rdb_l2andup_hit_count: {}", l2andup_hit_count);
+    GET_TICKER_COUNT_AND_SET_METRIC(GET_HIT_L2_AND_UP, rdb_l2_and_up_hit_count);
 }
 
 void pegasus_server_impl::update_server_rocksdb_statistics()
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index acaf13945..50f330cdf 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -504,34 +504,36 @@ private:
     METRIC_VAR_DECLARE_counter(read_expired_values);
     METRIC_VAR_DECLARE_counter(read_filtered_values);
     METRIC_VAR_DECLARE_counter(abnormal_read_requests);
+    METRIC_VAR_DECLARE_counter(throttling_rejected_read_requests);
 
     // rocksdb internal statistics
     // server level
     static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes;
     static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage;
-    // replica level
-    dsn::perf_counter_wrapper _pfc_rdb_sst_count;
-    dsn::perf_counter_wrapper _pfc_rdb_sst_size;
-    dsn::perf_counter_wrapper _pfc_rdb_index_and_filter_blocks_mem_usage;
-    dsn::perf_counter_wrapper _pfc_rdb_memtable_mem_usage;
-    dsn::perf_counter_wrapper _pfc_rdb_estimate_num_keys;
-
-    dsn::perf_counter_wrapper _pfc_rdb_bf_seek_negatives;
-    dsn::perf_counter_wrapper _pfc_rdb_bf_seek_total;
-    dsn::perf_counter_wrapper _pfc_rdb_bf_point_positive_true;
-    dsn::perf_counter_wrapper _pfc_rdb_bf_point_positive_total;
-    dsn::perf_counter_wrapper _pfc_rdb_bf_point_negatives;
-    dsn::perf_counter_wrapper _pfc_rdb_block_cache_hit_count;
-    dsn::perf_counter_wrapper _pfc_rdb_block_cache_total_count;
-    dsn::perf_counter_wrapper _pfc_rdb_write_amplification;
-    dsn::perf_counter_wrapper _pfc_rdb_read_amplification;
-    dsn::perf_counter_wrapper _pfc_rdb_memtable_hit_count;
-    dsn::perf_counter_wrapper _pfc_rdb_memtable_total_count;
-    dsn::perf_counter_wrapper _pfc_rdb_l0_hit_count;
-    dsn::perf_counter_wrapper _pfc_rdb_l1_hit_count;
-    dsn::perf_counter_wrapper _pfc_rdb_l2andup_hit_count;
-
-    dsn::perf_counter_wrapper _counter_recent_read_throttling_reject_count;
+
+    // Replica-level metrics for rocksdb.
+    METRIC_VAR_DECLARE_gauge_int64(rdb_total_sst_files);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_total_sst_size_mb);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_estimated_keys);
+
+    METRIC_VAR_DECLARE_gauge_int64(rdb_index_and_filter_blocks_mem_usage_bytes);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_mem_usage_bytes);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_block_cache_hit_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_block_cache_total_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_hit_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_total_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_l0_hit_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_l1_hit_count);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_l2_and_up_hit_count);
+
+    METRIC_VAR_DECLARE_gauge_int64(rdb_write_amplification);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_read_amplification);
+
+    METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_seek_negatives);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_seek_total);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_negatives);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_positives);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_true_positives);
 };
 
 } // namespace server
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 9575035c2..a76ba3dd4 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -34,57 +34,179 @@
 METRIC_DEFINE_counter(replica,
                       get_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of GET requests for each replica");
+                      "The number of GET requests");
 
 METRIC_DEFINE_counter(replica,
                       multi_get_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of MULTI_GET requests for each replica");
+                      "The number of MULTI_GET requests");
 
 METRIC_DEFINE_counter(replica,
                       batch_get_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of BATCH_GET requests for each replica");
+                      "The number of BATCH_GET requests");
 
 METRIC_DEFINE_counter(replica,
                       scan_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of SCAN requests for each replica");
+                      "The number of SCAN requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                get_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of GET requests for each replica");
+                               "The latency of GET requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                multi_get_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of MULTI_GET requests for each replica");
+                               "The latency of MULTI_GET requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                batch_get_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of BATCH_GET requests for each replica");
+                               "The latency of BATCH_GET requests");
 
 METRIC_DEFINE_percentile_int64(replica,
                                scan_latency_ns,
                                dsn::metric_unit::kNanoSeconds,
-                               "The latency of SCAN requests for each replica");
+                               "The latency of SCAN requests");
 
 METRIC_DEFINE_counter(replica,
                       read_expired_values,
                       dsn::metric_unit::kValues,
-                      "The number of expired values read for each replica");
+                      "The number of expired values read");
 
 METRIC_DEFINE_counter(replica,
                       read_filtered_values,
                       dsn::metric_unit::kValues,
-                      "The number of filtered values read for each replica");
+                      "The number of filtered values read");
 
 METRIC_DEFINE_counter(replica,
                       abnormal_read_requests,
                       dsn::metric_unit::kRequests,
-                      "The number of abnormal read requests for each replica");
+                      "The number of abnormal read requests");
+
+METRIC_DEFINE_counter(replica,
+                      throttling_rejected_read_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of rejected read requests by throttling");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_total_sst_files,
+                          dsn::metric_unit::kFiles,
+                          "The total number of rocksdb sst files");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_total_sst_size_mb,
+                          dsn::metric_unit::kMegaBytes,
+                          "The total size of rocksdb sst files in MB");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_estimated_keys,
+                          dsn::metric_unit::kKeys,
+                          "The estimated number of rocksdb keys");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_index_and_filter_blocks_mem_usage_bytes,
+                          dsn::metric_unit::kBytes,
+                          "The memory usage of rocksdb index and filter blocks in bytes");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_memtable_mem_usage_bytes,
+                          dsn::metric_unit::kBytes,
+                          "The memory usage of rocksdb memtables in bytes");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_block_cache_hit_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The hit number of lookups on rocksdb block cache");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_block_cache_total_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The total number of lookups on rocksdb block cache");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_memtable_hit_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The hit number of lookups on rocksdb memtable");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_memtable_total_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The total number of lookups on rocksdb memtable");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_l0_hit_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of lookups served by rocksdb L0");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_l1_hit_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of lookups served by rocksdb L1");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_l2_and_up_hit_count,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of lookups served by rocksdb L2 and up");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_write_amplification,
+                          dsn::metric_unit::kAmplification,
+                          "The write amplification of rocksdb");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_read_amplification,
+                          dsn::metric_unit::kAmplification,
+                          "The read amplification of rocksdb");
+
+// Following metrics are rocksdb statistics that are related to bloom filters.
+//
+// To measure prefix bloom filters, these metrics are updated after each ::Seek and ::SeekForPrev if
+// prefix is enabled and check_filter is set:
+// * rdb_bloom_filter_seek_negatives: seek_negatives
+// * rdb_bloom_filter_seek_total: seek_negatives + seek_positives
+//
+// To measure full bloom filters, these metrics are updated after each point lookup. If
+// whole_key_filtering is set, this is the result of checking the bloom of the whole key, otherwise
+// this is the result of checking the bloom of the prefix:
+// * rdb_bloom_filter_point_lookup_negatives: [true] negatives
+// * rdb_bloom_filter_point_lookup_positives: positives
+// * rdb_bloom_filter_point_lookup_true_positives: true positives
+//
+// For details please see https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#statistic.
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_bloom_filter_seek_negatives,
+                          dsn::metric_unit::kSeeks,
+                          "The number of times the check for prefix bloom filter was useful in "
+                          "avoiding iterator creation (and thus likely IOPs), used by rocksdb for "
+                          "each replica");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_bloom_filter_seek_total,
+                          dsn::metric_unit::kSeeks,
+                          "The number of times prefix bloom filter was checked before creating "
+                          "iterator on a file, used by rocksdb");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_bloom_filter_point_lookup_negatives,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of times full bloom filter has avoided file reads (i.e., "
+                          "negatives), used by rocksdb");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_bloom_filter_point_lookup_positives,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of times full bloom filter has not avoided the reads, used "
+                          "by rocksdb");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_bloom_filter_point_lookup_true_positives,
+                          dsn::metric_unit::kPointLookups,
+                          "The number of times full bloom filter has not avoided the reads and "
+                          "data actually exist, used by rocksdb");
 
 namespace pegasus {
 namespace server {
@@ -438,7 +560,27 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
       METRIC_VAR_INIT_replica(scan_latency_ns),
       METRIC_VAR_INIT_replica(read_expired_values),
       METRIC_VAR_INIT_replica(read_filtered_values),
-      METRIC_VAR_INIT_replica(abnormal_read_requests)
+      METRIC_VAR_INIT_replica(abnormal_read_requests),
+      METRIC_VAR_INIT_replica(throttling_rejected_read_requests),
+      METRIC_VAR_INIT_replica(rdb_total_sst_files),
+      METRIC_VAR_INIT_replica(rdb_total_sst_size_mb),
+      METRIC_VAR_INIT_replica(rdb_estimated_keys),
+      METRIC_VAR_INIT_replica(rdb_index_and_filter_blocks_mem_usage_bytes),
+      METRIC_VAR_INIT_replica(rdb_memtable_mem_usage_bytes),
+      METRIC_VAR_INIT_replica(rdb_block_cache_hit_count),
+      METRIC_VAR_INIT_replica(rdb_block_cache_total_count),
+      METRIC_VAR_INIT_replica(rdb_memtable_hit_count),
+      METRIC_VAR_INIT_replica(rdb_memtable_total_count),
+      METRIC_VAR_INIT_replica(rdb_l0_hit_count),
+      METRIC_VAR_INIT_replica(rdb_l1_hit_count),
+      METRIC_VAR_INIT_replica(rdb_l2_and_up_hit_count),
+      METRIC_VAR_INIT_replica(rdb_write_amplification),
+      METRIC_VAR_INIT_replica(rdb_read_amplification),
+      METRIC_VAR_INIT_replica(rdb_bloom_filter_seek_negatives),
+      METRIC_VAR_INIT_replica(rdb_bloom_filter_seek_total),
+      METRIC_VAR_INIT_replica(rdb_bloom_filter_point_lookup_negatives),
+      METRIC_VAR_INIT_replica(rdb_bloom_filter_point_lookup_positives),
+      METRIC_VAR_INIT_replica(rdb_bloom_filter_point_lookup_true_positives)
 {
     _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
     _gpid = get_gpid();
@@ -644,54 +786,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
     std::string str_gpid = _gpid.to_string();
     char name[256];
 
-    // register the perf counters
-    snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
-    _pfc_rdb_sst_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
-
-    snprintf(name, 255, "disk.storage.sst(MB)@%s", str_gpid.c_str());
-    _pfc_rdb_sst_size.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the size of sstable files");
-
-    snprintf(name, 255, "rdb.block_cache.hit_count@%s", str_gpid.c_str());
-    _pfc_rdb_block_cache_hit_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the hit count of rocksdb block cache");
-
-    snprintf(name, 255, "rdb.block_cache.total_count@%s", str_gpid.c_str());
-    _pfc_rdb_block_cache_total_count.init_app_counter(
-        "app.pegasus",
-        name,
-        COUNTER_TYPE_NUMBER,
-        "statistic the total count of rocksdb block cache");
-
-    snprintf(name, 255, "rdb.write_amplification@%s", str_gpid.c_str());
-    _pfc_rdb_write_amplification.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the write amplification of rocksdb");
-
-    snprintf(name, 255, "rdb.read_amplification@%s", str_gpid.c_str());
-    _pfc_rdb_read_amplification.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read amplification of rocksdb");
-
-    snprintf(name, 255, "rdb.read_memtable_hit_count@%s", str_gpid.c_str());
-    _pfc_rdb_memtable_hit_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read memtable hit count");
-
-    snprintf(name, 255, "rdb.read_memtable_total_count@%s", str_gpid.c_str());
-    _pfc_rdb_memtable_total_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read memtable total count");
-
-    snprintf(name, 255, "rdb.read_l0_hit_count@%s", str_gpid.c_str());
-    _pfc_rdb_l0_hit_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read l0 hit count");
-
-    snprintf(name, 255, "rdb.read_l1_hit_count@%s", str_gpid.c_str());
-    _pfc_rdb_l1_hit_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read l1 hit count");
-
-    snprintf(name, 255, "rdb.read_l2andup_hit_count@%s", str_gpid.c_str());
-    _pfc_rdb_l2andup_hit_count.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistics the read l2andup hit count");
-
     // These counters are singletons on this server shared by all replicas, so we initialize
     // them only once.
     static std::once_flag flag;
@@ -710,66 +804,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
             COUNTER_TYPE_NUMBER,
             "statistic the through bytes of rocksdb write rate limiter");
     });
-
-    snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str());
-    _pfc_rdb_index_and_filter_blocks_mem_usage.init_app_counter(
-        "app.pegasus",
-        name,
-        COUNTER_TYPE_NUMBER,
-        "statistic the memory usage of rocksdb index and filter blocks");
-
-    snprintf(name, 255, "rdb.memtable.memory_usage@%s", str_gpid.c_str());
-    _pfc_rdb_memtable_mem_usage.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the memory usage of rocksdb memtable");
-
-    snprintf(name, 255, "rdb.estimate_num_keys@%s", str_gpid.c_str());
-    _pfc_rdb_estimate_num_keys.init_app_counter(
-        "app.pegasus",
-        name,
-        COUNTER_TYPE_NUMBER,
-        "statistics the estimated number of keys inside the rocksdb");
-
-    snprintf(name, 255, "rdb.bf_seek_negatives@%s", str_gpid.c_str());
-    _pfc_rdb_bf_seek_negatives.init_app_counter("app.pegasus",
-                                                name,
-                                                COUNTER_TYPE_NUMBER,
-                                                "statistics the number of times bloom filter was "
-                                                "checked before creating iterator on a file and "
-                                                "useful in avoiding iterator creation (and thus "
-                                                "likely IOPs)");
-
-    snprintf(name, 255, "rdb.bf_seek_total@%s", str_gpid.c_str());
-    _pfc_rdb_bf_seek_total.init_app_counter("app.pegasus",
-                                            name,
-                                            COUNTER_TYPE_NUMBER,
-                                            "statistics the number of times bloom filter was "
-                                            "checked before creating iterator on a file");
-
-    snprintf(name, 255, "rdb.bf_point_positive_true@%s", str_gpid.c_str());
-    _pfc_rdb_bf_point_positive_true.init_app_counter(
-        "app.pegasus",
-        name,
-        COUNTER_TYPE_NUMBER,
-        "statistics the number of times bloom filter has avoided file reads, i.e., negatives");
-
-    snprintf(name, 255, "rdb.bf_point_positive_total@%s", str_gpid.c_str());
-    _pfc_rdb_bf_point_positive_total.init_app_counter(
-        "app.pegasus",
-        name,
-        COUNTER_TYPE_NUMBER,
-        "statistics the number of times bloom FullFilter has not avoided the reads");
-
-    snprintf(name, 255, "rdb.bf_point_negatives@%s", str_gpid.c_str());
-    _pfc_rdb_bf_point_negatives.init_app_counter("app.pegasus",
-                                                 name,
-                                                 COUNTER_TYPE_NUMBER,
-                                                 "statistics the number of times bloom FullFilter "
-                                                 "has not avoided the reads and data actually "
-                                                 "exist");
-
-    auto counter_str = fmt::format("recent.read.throttling.reject.count@{}", str_gpid.c_str());
-    _counter_recent_read_throttling_reject_count.init_app_counter(
-        "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());
 }
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index f44296ce5..29e7559b3 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -153,8 +153,9 @@
     METRIC_VAR_DECLARE(name, dsn::percentile_ptr<int64_t>)
 
 // Initialize a metric variable in user class.
-#define METRIC_VAR_INIT(name, entity) _##name(METRIC_##name.instantiate(entity##_metric_entity()))
-#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
+#define METRIC_VAR_INIT(name, entity, ...)                                                         \
+    _##name(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__))
+#define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__)
 
 // Perform increment-related operations on metrics including gauge and counter.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
@@ -599,8 +600,15 @@ enum class metric_unit : size_t
     kMicroSeconds,
     kMilliSeconds,
     kSeconds,
+    kBytes,
+    kMegaBytes,
     kRequests,
+    kSeeks,
+    kPointLookups,
     kValues,
+    kKeys,
+    kFiles,
+    kAmplification,
     kInvalidUnit,
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 01/04: feat(new_metrics): add replica-level metric entity (#1345)

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 18b06cf0e910135be8f3a31a9ce11c995af1807a
Author: Dan Wang <wa...@apache.org>
AuthorDate: Mon Feb 13 15:45:40 2023 +0800

    feat(new_metrics): add replica-level metric entity (#1345)
---
 src/replica/replica_base.cpp | 52 ++++++++++++++++++++++++++++++++++++++++++++
 src/replica/replica_base.h   | 10 +++++----
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/src/replica/replica_base.cpp b/src/replica/replica_base.cpp
new file mode 100644
index 000000000..11e08ae05
--- /dev/null
+++ b/src/replica/replica_base.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "replica_base.h"
+
+#include <fmt/core.h>
+
+METRIC_DEFINE_entity(replica);
+
+namespace dsn {
+namespace replication {
+
+namespace {
+
+metric_entity_ptr instantiate_replica_metric_entity(const gpid &id)
+{
+    auto entity_id = fmt::format("replica_{}", id);
+
+    // Do NOT add `replica_base._app_name` as the table name to the attributes of entity, since
+    // it is read-only and will never be updated even if the table is renamed.
+    return METRIC_ENTITY_replica.instantiate(
+        entity_id,
+        {{"table_id", std::to_string(id.get_app_id())},
+         {"partition_id", std::to_string(id.get_partition_index())}});
+}
+
+} // anonymous namespace
+
+replica_base::replica_base(gpid id, string_view name, string_view app_name)
+    : _gpid(id),
+      _name(name),
+      _app_name(app_name),
+      _replica_metric_entity(instantiate_replica_metric_entity(id))
+{
+}
+
+} // namespace replication
+} // namespace dsn
diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h
index 64f294e87..88202d055 100644
--- a/src/replica/replica_base.h
+++ b/src/replica/replica_base.h
@@ -27,6 +27,7 @@
 #pragma once
 
 #include "common/gpid.h"
+#include "utils/metrics.h"
 #include "utils/string_view.h"
 
 namespace dsn {
@@ -35,10 +36,7 @@ namespace replication {
 /// Base class for types that are one-instance-per-replica.
 struct replica_base
 {
-    replica_base(gpid id, string_view name, string_view app_name)
-        : _gpid(id), _name(name), _app_name(app_name)
-    {
-    }
+    replica_base(gpid id, string_view name, string_view app_name);
 
     explicit replica_base(replica_base *rhs)
         : replica_base(rhs->get_gpid(), rhs->replica_name(), rhs->_app_name)
@@ -53,10 +51,14 @@ struct replica_base
 
     const char *log_prefix() const { return _name.c_str(); }
 
+    const metric_entity_ptr &replica_metric_entity() const { return _replica_metric_entity; }
+
 private:
     const gpid _gpid;
     const std::string _name;
+    // TODO(wangdan): drop `_app_name` or make it changeable, since a table could be renamed.
     const std::string _app_name;
+    const metric_entity_ptr _replica_metric_entity;
 };
 
 } // namespace replication


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 02/04: feat(new_metrics): migrate replica-level metrics for write service (#1351)

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 09931e8e175454f0bb1a2275f4b2b785b5bf9eee
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu Feb 23 23:18:20 2023 +0800

    feat(new_metrics): migrate replica-level metrics for write service (#1351)
---
 .github/workflows/lint_and_test_cpp.yaml      |  12 +-
 run.sh                                        |   6 +-
 src/replica/replica_base.h                    |   9 +-
 src/server/pegasus_server_write.cpp           |  32 +--
 src/server/pegasus_server_write.h             |   7 +-
 src/server/pegasus_write_service.cpp          | 295 ++++++++++++++------------
 src/server/pegasus_write_service.h            |  65 +++---
 src/server/test/pegasus_server_write_test.cpp |   4 +-
 src/utils/metrics.h                           | 135 +++++++++++-
 src/utils/test/metrics_test.cpp               | 147 ++++++++++++-
 src/utils/time_utils.h                        |  29 ++-
 11 files changed, 537 insertions(+), 204 deletions(-)

diff --git a/.github/workflows/lint_and_test_cpp.yaml b/.github/workflows/lint_and_test_cpp.yaml
index f0f8f74a8..ee502fa69 100644
--- a/.github/workflows/lint_and_test_cpp.yaml
+++ b/.github/workflows/lint_and_test_cpp.yaml
@@ -128,7 +128,11 @@ jobs:
           - base_api_test
           - base_test
           - bulk_load_test
-          - detect_hotspot_test
+          # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+          # is being replaced with the new metrics system, its test will fail. Temporarily disable
+          # the test and re-enable it after the hotspot detection is migrated to the new metrics
+          # system.
+          # - detect_hotspot_test
           - dsn_aio_test
           - dsn_block_service_test
           - dsn.failure_detector.tests
@@ -250,7 +254,11 @@ jobs:
           - base_api_test
           - base_test
           - bulk_load_test
-          - detect_hotspot_test
+          # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+          # is being replaced with the new metrics system, its test will fail. Temporarily disable
+          # the test and re-enable it after the hotspot detection is migrated to the new metrics
+          # system.
+          # - detect_hotspot_test
           - dsn_aio_test
           - dsn_block_service_test
           - dsn.failure_detector.tests
diff --git a/run.sh b/run.sh
index 3adb6b106..bd73ef502 100755
--- a/run.sh
+++ b/run.sh
@@ -318,7 +318,11 @@ function run_test()
       base_api_test
       base_test
       bulk_load_test
-      detect_hotspot_test
+      # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which
+      # is being replaced with the new metrics system, its test will fail. Temporarily disable
+      # the test and re-enable it after the hotspot detection is migrated to the new metrics
+      # system.
+      # detect_hotspot_test
       dsn_aio_test
       dsn_block_service_test
       dsn.failure_detector.tests
diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h
index 88202d055..7c5b7747e 100644
--- a/src/replica/replica_base.h
+++ b/src/replica/replica_base.h
@@ -51,7 +51,14 @@ struct replica_base
 
     const char *log_prefix() const { return _name.c_str(); }
 
-    const metric_entity_ptr &replica_metric_entity() const { return _replica_metric_entity; }
+    const metric_entity_ptr &replica_metric_entity() const
+    {
+        CHECK_NOTNULL(_replica_metric_entity,
+                      "replica metric entity should has been instantiated: "
+                      "uninitialized entity cannot be used to instantiate "
+                      "metric");
+        return _replica_metric_entity;
+    }
 
 private:
     const gpid _gpid;
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 50ebe596a..1c7b6970d 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -17,30 +17,30 @@
  * under the License.
  */
 
+#include "server/pegasus_server_write.h"
+
+#include "base/pegasus_key_schema.h"
+#include "common/duplication_common.h"
 #include "runtime/message_utils.h"
-#include "common//duplication_common.h"
+#include "server/logging_utils.h"
+#include "server/pegasus_mutation_duplicator.h"
+#include "server/pegasus_server_impl.h"
 #include "utils/defer.h"
 
-#include "base/pegasus_key_schema.h"
-#include "pegasus_server_write.h"
-#include "pegasus_server_impl.h"
-#include "logging_utils.h"
-#include "pegasus_mutation_duplicator.h"
+METRIC_DEFINE_counter(replica,
+                      corrupt_writes,
+                      dsn::metric_unit::kRequests,
+                      "The number of corrupt writes for each replica");
 
 namespace pegasus {
 namespace server {
 DSN_DECLARE_bool(rocksdb_verbose_log);
 
 pegasus_server_write::pegasus_server_write(pegasus_server_impl *server)
-    : replica_base(server), _write_svc(new pegasus_write_service(server))
+    : replica_base(server),
+      _write_svc(new pegasus_write_service(server)),
+      METRIC_VAR_INIT_replica(corrupt_writes)
 {
-    char name[256];
-    snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string());
-    _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus",
-                                                     name,
-                                                     COUNTER_TYPE_VOLATILE_NUMBER,
-                                                     "statistic the recent corrupt write count");
-
     init_non_batch_write_handlers();
 }
 
@@ -66,7 +66,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
             return iter->second(requests[0]);
         }
     } catch (TTransportException &ex) {
-        _pfc_recent_corrupt_write_count->increment();
+        METRIC_VAR_INCREMENT(corrupt_writes);
         LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}",
                          requests[0]->header->from_address.to_string(),
                          ex.what());
@@ -110,7 +110,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun
                     }
                 }
             } catch (TTransportException &ex) {
-                _pfc_recent_corrupt_write_count->increment();
+                METRIC_VAR_INCREMENT(corrupt_writes);
                 LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = {}, exception = {}",
                                  requests[i]->header->from_address.to_string(),
                                  ex.what());
diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h
index e4eebfcbb..99a5d6ff6 100644
--- a/src/server/pegasus_server_write.h
+++ b/src/server/pegasus_server_write.h
@@ -19,10 +19,9 @@
 
 #pragma once
 
-#include "replica/replica_base.h"
-
 #include "base/pegasus_rpc_types.h"
-#include "pegasus_write_service.h"
+#include "server/pegasus_write_service.h"
+#include "utils/metrics.h"
 
 namespace pegasus {
 namespace server {
@@ -88,7 +87,7 @@ private:
     typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map;
     non_batch_writes_map _non_batch_write_handlers;
 
-    ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count;
+    METRIC_VAR_DECLARE_counter(corrupt_writes);
 };
 
 } // namespace server
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 338e81190..931e9ab2b 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -17,14 +17,102 @@
  * under the License.
  */
 
-#include "base/pegasus_rpc_types.h"
-#include "pegasus_write_service.h"
-#include "pegasus_write_service_impl.h"
-#include "capacity_unit_calculator.h"
+#include "server/pegasus_write_service.h"
 
-#include "runtime/message_utils.h"
+#include "base/pegasus_rpc_types.h"
 #include "common/replication.codes.h"
+#include "runtime/message_utils.h"
+#include "server/capacity_unit_calculator.h"
+#include "server/pegasus_write_service_impl.h"
 #include "utils/defer.h"
+#include "utils/time_utils.h"
+
+METRIC_DEFINE_counter(replica,
+                      put_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of PUT requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      multi_put_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of MULTI_PUT requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      remove_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of REMOVE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      multi_remove_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of MULTI_REMOVE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      incr_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of INCR requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      check_and_set_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of CHECK_AND_SET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      check_and_mutate_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of CHECK_AND_MUTATE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               put_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of PUT requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               multi_put_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of MULTI_PUT requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               remove_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of REMOVE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               multi_remove_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of MULTI_REMOVE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               incr_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of INCR requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               check_and_set_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of CHECK_AND_SET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               check_and_mutate_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of CHECK_AND_MUTATE requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      dup_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of DUPLICATE requests for each replica");
+
+METRIC_DEFINE_percentile_int64(
+    replica,
+    dup_time_lag_ms,
+    dsn::metric_unit::kMilliSeconds,
+    "the time lag (in ms) between master and slave in the duplication for each replica");
+
+METRIC_DEFINE_counter(
+    replica,
+    dup_lagging_writes,
+    dsn::metric_unit::kRequests,
+    "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)");
 
 namespace pegasus {
 namespace server {
@@ -42,105 +130,33 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
       _server(server),
       _impl(new impl(server)),
       _batch_start_time(0),
-      _cu_calculator(server->_cu_calculator.get())
+      _cu_calculator(server->_cu_calculator.get()),
+      METRIC_VAR_INIT_replica(put_requests),
+      METRIC_VAR_INIT_replica(multi_put_requests),
+      METRIC_VAR_INIT_replica(remove_requests),
+      METRIC_VAR_INIT_replica(multi_remove_requests),
+      METRIC_VAR_INIT_replica(incr_requests),
+      METRIC_VAR_INIT_replica(check_and_set_requests),
+      METRIC_VAR_INIT_replica(check_and_mutate_requests),
+      METRIC_VAR_INIT_replica(put_latency_ns),
+      METRIC_VAR_INIT_replica(multi_put_latency_ns),
+      METRIC_VAR_INIT_replica(remove_latency_ns),
+      METRIC_VAR_INIT_replica(multi_remove_latency_ns),
+      METRIC_VAR_INIT_replica(incr_latency_ns),
+      METRIC_VAR_INIT_replica(check_and_set_latency_ns),
+      METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
+      METRIC_VAR_INIT_replica(dup_requests),
+      METRIC_VAR_INIT_replica(dup_time_lag_ms),
+      METRIC_VAR_INIT_replica(dup_lagging_writes),
+      _put_batch_size(0),
+      _remove_batch_size(0)
 {
-    std::string str_gpid = fmt::format("{}", server->get_gpid());
-
-    std::string name;
-
-    name = fmt::format("put_qps@{}", str_gpid);
-    _pfc_put_qps.init_app_counter(
-        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");
-
-    name = fmt::format("multi_put_qps@{}", str_gpid);
-    _pfc_multi_put_qps.init_app_counter(
-        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");
-
-    name = fmt::format("remove_qps@{}", str_gpid);
-    _pfc_remove_qps.init_app_counter(
-        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");
-
-    name = fmt::format("multi_remove_qps@{}", str_gpid);
-    _pfc_multi_remove_qps.init_app_counter("app.pegasus",
-                                           name.c_str(),
-                                           COUNTER_TYPE_RATE,
-                                           "statistic the qps of MULTI_REMOVE request");
-
-    name = fmt::format("incr_qps@{}", str_gpid);
-    _pfc_incr_qps.init_app_counter(
-        "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request");
-
-    name = fmt::format("check_and_set_qps@{}", str_gpid);
-    _pfc_check_and_set_qps.init_app_counter("app.pegasus",
-                                            name.c_str(),
-                                            COUNTER_TYPE_RATE,
-                                            "statistic the qps of CHECK_AND_SET request");
-
-    name = fmt::format("check_and_mutate_qps@{}", str_gpid);
-    _pfc_check_and_mutate_qps.init_app_counter("app.pegasus",
-                                               name.c_str(),
-                                               COUNTER_TYPE_RATE,
-                                               "statistic the qps of CHECK_AND_MUTATE request");
-
-    name = fmt::format("put_latency@{}", str_gpid);
-    _pfc_put_latency.init_app_counter("app.pegasus",
-                                      name.c_str(),
-                                      COUNTER_TYPE_NUMBER_PERCENTILES,
-                                      "statistic the latency of PUT request");
-
-    name = fmt::format("multi_put_latency@{}", str_gpid);
-    _pfc_multi_put_latency.init_app_counter("app.pegasus",
-                                            name.c_str(),
-                                            COUNTER_TYPE_NUMBER_PERCENTILES,
-                                            "statistic the latency of MULTI_PUT request");
-
-    name = fmt::format("remove_latency@{}", str_gpid);
-    _pfc_remove_latency.init_app_counter("app.pegasus",
-                                         name.c_str(),
-                                         COUNTER_TYPE_NUMBER_PERCENTILES,
-                                         "statistic the latency of REMOVE request");
-
-    name = fmt::format("multi_remove_latency@{}", str_gpid);
-    _pfc_multi_remove_latency.init_app_counter("app.pegasus",
-                                               name.c_str(),
-                                               COUNTER_TYPE_NUMBER_PERCENTILES,
-                                               "statistic the latency of MULTI_REMOVE request");
-
-    name = fmt::format("incr_latency@{}", str_gpid);
-    _pfc_incr_latency.init_app_counter("app.pegasus",
-                                       name.c_str(),
-                                       COUNTER_TYPE_NUMBER_PERCENTILES,
-                                       "statistic the latency of INCR request");
-
-    name = fmt::format("check_and_set_latency@{}", str_gpid);
-    _pfc_check_and_set_latency.init_app_counter("app.pegasus",
-                                                name.c_str(),
-                                                COUNTER_TYPE_NUMBER_PERCENTILES,
-                                                "statistic the latency of CHECK_AND_SET request");
-
-    name = fmt::format("check_and_mutate_latency@{}", str_gpid);
-    _pfc_check_and_mutate_latency.init_app_counter(
-        "app.pegasus",
-        name.c_str(),
-        COUNTER_TYPE_NUMBER_PERCENTILES,
-        "statistic the latency of CHECK_AND_MUTATE request");
-
-    _pfc_duplicate_qps.init_app_counter("app.pegasus",
-                                        fmt::format("duplicate_qps@{}", str_gpid).c_str(),
-                                        COUNTER_TYPE_RATE,
-                                        "statistic the qps of DUPLICATE requests");
-
-    _pfc_dup_time_lag.init_app_counter(
-        "app.pegasus",
-        fmt::format("dup.time_lag_ms@{}", app_name()).c_str(),
-        COUNTER_TYPE_NUMBER_PERCENTILES,
-        "the time (in ms) lag between master and slave in the duplication");
-
-    _pfc_dup_lagging_writes.init_app_counter(
-        "app.pegasus",
-        fmt::format("dup.lagging_writes@{}", app_name()).c_str(),
-        COUNTER_TYPE_VOLATILE_NUMBER,
-        "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)");
+    _dup_lagging_write_threshold_ms = dsn_config_get_value_int64(
+        "pegasus.server",
+        "dup_lagging_write_threshold_ms",
+        10 * 1000,
+        "If the duration that a write flows from master to slave is larger than this threshold, "
+        "the write is defined a lagging write.");
 }
 
 pegasus_write_service::~pegasus_write_service() {}
@@ -151,15 +167,15 @@ int pegasus_write_service::multi_put(const db_write_context &ctx,
                                      const dsn::apps::multi_put_request &update,
                                      dsn::apps::update_response &resp)
 {
-    uint64_t start_time = dsn_now_ns();
-    _pfc_multi_put_qps->increment();
+    METRIC_VAR_AUTO_LATENCY(multi_put_latency_ns);
+    METRIC_VAR_INCREMENT(multi_put_requests);
+
     int err = _impl->multi_put(ctx, update, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs);
     }
 
-    _pfc_multi_put_latency->set(dsn_now_ns() - start_time);
     return err;
 }
 
@@ -167,15 +183,15 @@ int pegasus_write_service::multi_remove(int64_t decree,
                                         const dsn::apps::multi_remove_request &update,
                                         dsn::apps::multi_remove_response &resp)
 {
-    uint64_t start_time = dsn_now_ns();
-    _pfc_multi_remove_qps->increment();
+    METRIC_VAR_AUTO_LATENCY(multi_remove_latency_ns);
+    METRIC_VAR_INCREMENT(multi_remove_requests);
+
     int err = _impl->multi_remove(decree, update, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_multi_remove_cu(resp.error, update.hash_key, update.sort_keys);
     }
 
-    _pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
     return err;
 }
 
@@ -183,15 +199,15 @@ int pegasus_write_service::incr(int64_t decree,
                                 const dsn::apps::incr_request &update,
                                 dsn::apps::incr_response &resp)
 {
-    uint64_t start_time = dsn_now_ns();
-    _pfc_incr_qps->increment();
+    METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
+    METRIC_VAR_INCREMENT(incr_requests);
+
     int err = _impl->incr(decree, update, resp);
 
     if (_server->is_primary()) {
         _cu_calculator->add_incr_cu(resp.error, update.key);
     }
 
-    _pfc_incr_latency->set(dsn_now_ns() - start_time);
     return err;
 }
 
@@ -199,8 +215,9 @@ int pegasus_write_service::check_and_set(int64_t decree,
                                          const dsn::apps::check_and_set_request &update,
                                          dsn::apps::check_and_set_response &resp)
 {
-    uint64_t start_time = dsn_now_ns();
-    _pfc_check_and_set_qps->increment();
+    METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
+    METRIC_VAR_INCREMENT(check_and_set_requests);
+
     int err = _impl->check_and_set(decree, update, resp);
 
     if (_server->is_primary()) {
@@ -211,7 +228,6 @@ int pegasus_write_service::check_and_set(int64_t decree,
                                              update.set_value);
     }
 
-    _pfc_check_and_set_latency->set(dsn_now_ns() - start_time);
     return err;
 }
 
@@ -219,8 +235,9 @@ int pegasus_write_service::check_and_mutate(int64_t decree,
                                             const dsn::apps::check_and_mutate_request &update,
                                             dsn::apps::check_and_mutate_response &resp)
 {
-    uint64_t start_time = dsn_now_ns();
-    _pfc_check_and_mutate_qps->increment();
+    METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns);
+    METRIC_VAR_INCREMENT(check_and_mutate_requests);
+
     int err = _impl->check_and_mutate(decree, update, resp);
 
     if (_server->is_primary()) {
@@ -228,7 +245,6 @@ int pegasus_write_service::check_and_mutate(int64_t decree,
             resp.error, update.hash_key, update.check_sort_key, update.mutate_list);
     }
 
-    _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time);
     return err;
 }
 
@@ -246,8 +262,7 @@ int pegasus_write_service::batch_put(const db_write_context &ctx,
 {
     CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after batch_prepare");
 
-    _batch_qps_perfcounters.push_back(_pfc_put_qps.get());
-    _batch_latency_perfcounters.push_back(_pfc_put_latency.get());
+    ++_put_batch_size;
     int err = _impl->batch_put(ctx, update, resp);
 
     if (_server->is_primary()) {
@@ -263,8 +278,7 @@ int pegasus_write_service::batch_remove(int64_t decree,
 {
     CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after batch_prepare");
 
-    _batch_qps_perfcounters.push_back(_pfc_remove_qps.get());
-    _batch_latency_perfcounters.push_back(_pfc_remove_latency.get());
+    ++_remove_batch_size;
     int err = _impl->batch_remove(decree, key, resp);
 
     if (_server->is_primary()) {
@@ -296,15 +310,21 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_t
 
 void pegasus_write_service::clear_up_batch_states()
 {
-    uint64_t latency = dsn_now_ns() - _batch_start_time;
-    for (dsn::perf_counter *pfc : _batch_qps_perfcounters)
-        pfc->increment();
-    for (dsn::perf_counter *pfc : _batch_latency_perfcounters)
-        pfc->set(latency);
-
-    _batch_qps_perfcounters.clear();
-    _batch_latency_perfcounters.clear();
+#define PROCESS_WRITE_BATCH(op)                                                                    \
+    do {                                                                                           \
+        METRIC_VAR_INCREMENT_BY(op##_requests, static_cast<int64_t>(_##op##_batch_size));          \
+        METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(_##op##_batch_size), latency_ns);      \
+        _##op##_batch_size = 0;                                                                    \
+    } while (0)
+
+    auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
+
+    PROCESS_WRITE_BATCH(put);
+    PROCESS_WRITE_BATCH(remove);
+
     _batch_start_time = 0;
+
+#undef PROCESS_WRITE_BATCH
 }
 
 int pegasus_write_service::duplicate(int64_t decree,
@@ -324,14 +344,13 @@ int pegasus_write_service::duplicate(int64_t decree,
             return empty_put(decree);
         }
 
-        _pfc_duplicate_qps->increment();
-        auto cleanup = dsn::defer([this, &request]() {
-            uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000;
-            if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) {
-                _pfc_dup_lagging_writes->increment();
-            }
-            _pfc_dup_time_lag->set(latency_ms);
-        });
+        METRIC_VAR_INCREMENT(dup_requests);
+        METRIC_VAR_AUTO_LATENCY(
+            dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) {
+                if (latency > _dup_lagging_write_threshold_ms) {
+                    METRIC_VAR_INCREMENT(dup_lagging_writes);
+                }
+            });
         dsn::message_ex *write =
             dsn::from_blob_to_received_msg(request.task_code, request.raw_message);
         bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 2315189f5..15bd4b473 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -19,22 +19,20 @@
 
 #pragma once
 
-#include "perf_counter/perf_counter_wrapper.h"
-#include "replica/replica_base.h"
+#include "base/pegasus_utils.h"
+#include "base/pegasus_value_schema.h"
+#include "backup_types.h"
+#include "bulk_load_types.h"
 #include "common/common.h"
-#include "common//duplication_common.h"
+#include "common/duplication_common.h"
+#include "consensus_types.h"
+#include "duplication_internal_types.h"
+#include "duplication_types.h"
 #include "meta_admin_types.h"
 #include "partition_split_types.h"
-#include "duplication_types.h"
-#include "bulk_load_types.h"
-#include "backup_types.h"
-#include "consensus_types.h"
+#include "replica/replica_base.h"
 #include "replica_admin_types.h"
-
-#include "base/pegasus_value_schema.h"
-#include "base/pegasus_utils.h"
 #include "rrdb/rrdb_types.h"
-#include "duplication_internal_types.h"
 
 namespace pegasus {
 namespace server {
@@ -198,28 +196,29 @@ private:
 
     capacity_unit_calculator *_cu_calculator;
 
-    ::dsn::perf_counter_wrapper _pfc_put_qps;
-    ::dsn::perf_counter_wrapper _pfc_multi_put_qps;
-    ::dsn::perf_counter_wrapper _pfc_remove_qps;
-    ::dsn::perf_counter_wrapper _pfc_multi_remove_qps;
-    ::dsn::perf_counter_wrapper _pfc_incr_qps;
-    ::dsn::perf_counter_wrapper _pfc_check_and_set_qps;
-    ::dsn::perf_counter_wrapper _pfc_check_and_mutate_qps;
-    ::dsn::perf_counter_wrapper _pfc_duplicate_qps;
-    ::dsn::perf_counter_wrapper _pfc_dup_time_lag;
-    ::dsn::perf_counter_wrapper _pfc_dup_lagging_writes;
-
-    ::dsn::perf_counter_wrapper _pfc_put_latency;
-    ::dsn::perf_counter_wrapper _pfc_multi_put_latency;
-    ::dsn::perf_counter_wrapper _pfc_remove_latency;
-    ::dsn::perf_counter_wrapper _pfc_multi_remove_latency;
-    ::dsn::perf_counter_wrapper _pfc_incr_latency;
-    ::dsn::perf_counter_wrapper _pfc_check_and_set_latency;
-    ::dsn::perf_counter_wrapper _pfc_check_and_mutate_latency;
-
-    // Records all requests.
-    std::vector<::dsn::perf_counter *> _batch_qps_perfcounters;
-    std::vector<::dsn::perf_counter *> _batch_latency_perfcounters;
+    METRIC_VAR_DECLARE_counter(put_requests);
+    METRIC_VAR_DECLARE_counter(multi_put_requests);
+    METRIC_VAR_DECLARE_counter(remove_requests);
+    METRIC_VAR_DECLARE_counter(multi_remove_requests);
+    METRIC_VAR_DECLARE_counter(incr_requests);
+    METRIC_VAR_DECLARE_counter(check_and_set_requests);
+    METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
+
+    METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(multi_remove_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(incr_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(check_and_set_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);
+
+    METRIC_VAR_DECLARE_counter(dup_requests);
+    METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
+    METRIC_VAR_DECLARE_counter(dup_lagging_writes);
+
+    // Record batch size for put and remove requests.
+    uint32_t _put_batch_size;
+    uint32_t _remove_batch_size;
 
     // TODO(wutao1): add perf counters for failed rpc.
 };
diff --git a/src/server/test/pegasus_server_write_test.cpp b/src/server/test/pegasus_server_write_test.cpp
index 40a40433e..642488143 100644
--- a/src/server/test/pegasus_server_write_test.cpp
+++ b/src/server/test/pegasus_server_write_test.cpp
@@ -88,8 +88,8 @@ public:
                 // make sure everything is cleanup after batch write.
                 ASSERT_TRUE(_server_write->_put_rpc_batch.empty());
                 ASSERT_TRUE(_server_write->_remove_rpc_batch.empty());
-                ASSERT_TRUE(_server_write->_write_svc->_batch_qps_perfcounters.empty());
-                ASSERT_TRUE(_server_write->_write_svc->_batch_latency_perfcounters.empty());
+                ASSERT_EQ(_server_write->_write_svc->_put_batch_size, 0);
+                ASSERT_EQ(_server_write->_write_svc->_remove_batch_size, 0);
                 ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0);
                 ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(),
                           0);
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 340d2fd9d..f32665f24 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -49,6 +49,7 @@
 #include "utils/string_view.h"
 #include "utils/strings.h"
 #include "utils/synchronize.h"
+#include "utils/time_utils.h"
 
 // A metric library (for details pls see https://github.com/apache/incubator-pegasus/issues/922)
 // inspired by Kudu metrics (https://github.com/apache/kudu/blob/master/src/kudu/util/metrics.h).
@@ -81,7 +82,8 @@
 // Instantiating the metric in whatever class represents it with some initial arguments, if any:
 // metric_instance = METRIC_my_gauge_name.instantiate(entity_instance, ...);
 
-// Convenient macros are provided to define entity types and metric prototypes.
+// The following are convenient macros provided to define entity types and metric prototypes.
+
 #define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name)
 #define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...)                              \
     ::dsn::gauge_prototype<int64_t> METRIC_##name(                                                 \
@@ -89,6 +91,7 @@
 #define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...)                             \
     ::dsn::gauge_prototype<double> METRIC_##name(                                                  \
         {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
+
 // There are 2 kinds of counters:
 // - `counter` is the general type of counter that is implemented by striped_long_adder, which can
 //   achieve high performance while consuming less memory if it's not updated very frequently.
@@ -133,6 +136,42 @@
 #define METRIC_DECLARE_percentile_double(name)                                                     \
     extern dsn::floating_percentile_prototype<double> METRIC_##name
 
+// Following METRIC_*VAR* macros are introduced so that:
+// * only need to use prototype name to operate each metric variable;
+// * uniformly name each variable in user class;
+// * differentiate operations on metrics significantly from main logic, improving code readability.
+
+// Declare a metric variable in user class.
+//
+// Since a type tends to be a class template where there might be commas, use variadic arguments
+// instead of a single fixed argument to represent a type.
+#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ _##name
+#define METRIC_VAR_DECLARE_gauge_int64(name) METRIC_VAR_DECLARE(name, dsn::gauge_ptr<int64_t>)
+#define METRIC_VAR_DECLARE_counter(name)                                                           \
+    METRIC_VAR_DECLARE(name, dsn::counter_ptr<dsn::striped_long_adder, false>)
+#define METRIC_VAR_DECLARE_percentile_int64(name)                                                  \
+    METRIC_VAR_DECLARE(name, dsn::percentile_ptr<int64_t>)
+
+// Initialize a metric variable in user class.
+#define METRIC_VAR_INIT(name, entity) _##name(METRIC_##name.instantiate(entity##_metric_entity()))
+#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
+
+// Perform increment-related operations on metrics including gauge and counter.
+#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT(name) _##name->increment()
+
+// Perform set() operations on metrics including gauge and percentile.
+//
+// There are 2 kinds of invocations of set() for a metric:
+// * set(val): set a single value for a metric, such as gauge, percentile;
+// * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric,
+// such as percentile.
+#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
+
+// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
+#define METRIC_VAR_AUTO_LATENCY(name, ...)                                                         \
+    dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
+
 namespace dsn {
 
 class metric_prototype;
@@ -543,7 +582,7 @@ ENUM_REG_WITH_CUSTOM_NAME(metric_type::kVolatileCounter, volatile_counter)
 ENUM_REG_WITH_CUSTOM_NAME(metric_type::kPercentile, percentile)
 ENUM_END(metric_type)
 
-enum class metric_unit
+enum class metric_unit : size_t
 {
     kNanoSeconds,
     kMicroSeconds,
@@ -553,6 +592,31 @@ enum class metric_unit
     kInvalidUnit,
 };
 
+#define METRIC_ASSERT_UNIT_LATENCY(unit, index)                                                    \
+    static_assert(static_cast<size_t>(metric_unit::unit) == index,                                 \
+                  #unit " should be at index " #index)
+
+METRIC_ASSERT_UNIT_LATENCY(kNanoSeconds, 0);
+METRIC_ASSERT_UNIT_LATENCY(kMicroSeconds, 1);
+METRIC_ASSERT_UNIT_LATENCY(kMilliSeconds, 2);
+METRIC_ASSERT_UNIT_LATENCY(kSeconds, 3);
+
+const std::vector<uint64_t> kMetricLatencyConverterFromNS = {
+    1, 1000, 1000 * 1000, 1000 * 1000 * 1000};
+
+inline uint64_t convert_metric_latency_from_ns(uint64_t latency_ns, metric_unit target_unit)
+{
+    if (dsn_likely(target_unit == metric_unit::kNanoSeconds)) {
+        // Since nanoseconds are used as the latency unit more frequently, eliminate unnecessary
+        // conversion by branch prediction.
+        return latency_ns;
+    }
+
+    auto index = static_cast<size_t>(target_unit);
+    CHECK_LT(index, kMetricLatencyConverterFromNS.size());
+    return latency_ns / kMetricLatencyConverterFromNS[index];
+}
+
 ENUM_BEGIN(metric_unit, metric_unit::kInvalidUnit)
 ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kNanoSeconds, nanoseconds)
 ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kMicroSeconds, microseconds)
@@ -1057,6 +1121,13 @@ public:
         _samples.get()[index & (_sample_size - 1)] = val;
     }
 
+    void set(size_t n, const value_type &val)
+    {
+        for (size_t i = 0; i < n; ++i) {
+            set(val);
+        }
+    }
+
     // If `type` is not configured, it will return false with zero value stored in `val`;
     // otherwise, it will always return true with the value corresponding to `type`.
     bool get(kth_percentile_type type, value_type &val) const
@@ -1168,6 +1239,7 @@ private:
 
     friend class metric_entity;
     friend class ref_ptr<percentile<value_type, NthElementFinder>>;
+    friend class MetricVarTest;
 
     virtual void close() override
     {
@@ -1190,6 +1262,20 @@ private:
         release_ref();
     }
 
+    std::vector<value_type> samples_for_test()
+    {
+        size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
+        if (real_sample_size == 0) {
+            return std::vector<value_type>();
+        }
+
+        std::vector<value_type> real_samples(real_sample_size);
+        std::copy(_samples.get(), _samples.get() + real_sample_size, real_samples.begin());
+        return real_samples;
+    }
+
+    void reset_tail_for_test() { _tail.store(0); }
+
     value_type value(size_t index) const
     {
         return _full_nth_elements[index].load(std::memory_order_relaxed);
@@ -1210,7 +1296,7 @@ private:
         }
 
         // Find nth elements.
-        std::vector<T> array(real_sample_size);
+        std::vector<value_type> array(real_sample_size);
         std::copy(_samples.get(), _samples.get() + real_sample_size, array.begin());
         _nth_element_finder(array.begin(), array.begin(), array.end());
 
@@ -1279,4 +1365,47 @@ template <typename T,
 using floating_percentile_prototype =
     metric_prototype_with<floating_percentile<T, NthElementFinder>>;
 
+// Compute latency automatically at the end of the scope, which is set to percentile which it has
+// bound to.
+class auto_latency
+{
+public:
+    auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {}
+
+    auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback)
+        : _percentile(percentile), _callback(std::move(callback))
+    {
+    }
+
+    auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns)
+        : _percentile(percentile), _chrono(start_time_ns)
+    {
+    }
+
+    auto_latency(const percentile_ptr<int64_t> &percentile,
+                 uint64_t start_time_ns,
+                 std::function<void(uint64_t)> callback)
+        : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback))
+    {
+    }
+
+    ~auto_latency()
+    {
+        auto latency =
+            convert_metric_latency_from_ns(_chrono.duration_ns(), _percentile->prototype()->unit());
+        _percentile->set(static_cast<int64_t>(latency));
+
+        if (_callback) {
+            _callback(latency);
+        }
+    }
+
+private:
+    percentile_ptr<int64_t> _percentile;
+    utils::chronograph _chrono;
+    std::function<void(uint64_t)> _callback;
+
+    DISALLOW_COPY_AND_ASSIGN(auto_latency);
+};
+
 } // namespace dsn
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 20414a274..9a0d27e86 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -70,6 +70,8 @@ METRIC_DEFINE_entity(my_server);
 METRIC_DEFINE_entity(my_table);
 METRIC_DEFINE_entity(my_replica);
 
+#define METRIC_VAR_INIT_my_replica(name) METRIC_VAR_INIT(name, my_replica)
+
 // Dedicated entity for getting metrics by http service.
 METRIC_DEFINE_entity(my_app);
 
@@ -161,6 +163,26 @@ METRIC_DEFINE_percentile_double(my_server,
                                 dsn::metric_unit::kNanoSeconds,
                                 "a server-level percentile of double type for test");
 
+METRIC_DEFINE_percentile_int64(my_replica,
+                               test_replica_percentile_int64_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "a replica-level percentile of int64 type in nanoseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+                               test_replica_percentile_int64_us,
+                               dsn::metric_unit::kMicroSeconds,
+                               "a replica-level percentile of int64 type in microseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+                               test_replica_percentile_int64_ms,
+                               dsn::metric_unit::kMilliSeconds,
+                               "a replica-level percentile of int64 type in milliseconds for test");
+
+METRIC_DEFINE_percentile_int64(my_replica,
+                               test_replica_percentile_int64_s,
+                               dsn::metric_unit::kSeconds,
+                               "a replica-level percentile of int64 type in seconds for test");
+
 namespace dsn {
 
 TEST(metrics_test, create_entity)
@@ -724,9 +746,7 @@ void run_percentile(const metric_entity_ptr &my_entity,
     auto my_metric = prototype.instantiate(my_entity, interval_ms, kth_percentiles, sample_size);
 
     // Preload zero in current thread.
-    for (size_t i = 0; i < num_preload; ++i) {
-        my_metric->set(0);
-    }
+    my_metric->set(num_preload, 0);
 
     // Load other data in each spawned thread evenly.
     const size_t num_operations = data.size() / num_threads;
@@ -3043,4 +3063,125 @@ INSTANTIATE_TEST_CASE_P(MetricsTest,
                         MetricsRetirementTest,
                         testing::ValuesIn(metrics_retirement_tests));
 
+class MetricVarTest : public testing::Test
+{
+protected:
+    MetricVarTest();
+
+    void SetUp() override
+    {
+        _test_replica_gauge_int64->set(0);
+        _test_replica_counter->reset();
+        _test_replica_percentile_int64_ns->reset_tail_for_test();
+        _test_replica_percentile_int64_us->reset_tail_for_test();
+        _test_replica_percentile_int64_ms->reset_tail_for_test();
+        _test_replica_percentile_int64_s->reset_tail_for_test();
+    }
+
+    const metric_entity_ptr &my_replica_metric_entity() const { return _my_replica_metric_entity; }
+
+    void test_set_percentile(const std::vector<int64_t> &expected_samples);
+    void test_set_percentile(size_t n, int64_t val);
+
+    const metric_entity_ptr _my_replica_metric_entity;
+    METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64);
+    METRIC_VAR_DECLARE_counter(test_replica_counter);
+    METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ns);
+    METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_us);
+    METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ms);
+    METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_s);
+
+    DISALLOW_COPY_AND_ASSIGN(MetricVarTest);
+};
+
+MetricVarTest::MetricVarTest()
+    : _my_replica_metric_entity(METRIC_ENTITY_my_replica.instantiate("replica_var_test")),
+      METRIC_VAR_INIT_my_replica(test_replica_gauge_int64),
+      METRIC_VAR_INIT_my_replica(test_replica_counter),
+      METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ns),
+      METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_us),
+      METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ms),
+      METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_s)
+{
+}
+
+#define METRIC_VAR_SAMPLES(name) _##name->samples_for_test()
+
+void MetricVarTest::test_set_percentile(const std::vector<int64_t> &expected_samples)
+{
+    for (const auto &val : expected_samples) {
+        METRIC_VAR_SET(test_replica_percentile_int64_ns, val);
+    }
+    EXPECT_EQ(expected_samples, METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
+}
+
+void MetricVarTest::test_set_percentile(size_t n, int64_t val)
+{
+    METRIC_VAR_SET(test_replica_percentile_int64_ns, n, val);
+    EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
+}
+
+#define METRIC_VAR_VALUE(name) _##name->value()
+
+#define TEST_METRIC_VAR_INCREMENT(name)                                                            \
+    do {                                                                                           \
+        ASSERT_EQ(0, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT(name);                                                                \
+        ASSERT_EQ(1, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT(name);                                                                \
+        ASSERT_EQ(2, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT_BY(name, 5);                                                          \
+        ASSERT_EQ(7, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT_BY(name, 18);                                                         \
+        ASSERT_EQ(25, METRIC_VAR_VALUE(name));                                                     \
+    } while (0);
+
+TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_gauge_int64); }
+
+TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); }
+
+TEST_F(MetricVarTest, SetGauge)
+{
+    ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+    METRIC_VAR_SET(test_replica_gauge_int64, 5);
+    ASSERT_EQ(5, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+    METRIC_VAR_SET(test_replica_gauge_int64, 18);
+    ASSERT_EQ(18, METRIC_VAR_VALUE(test_replica_gauge_int64));
+}
+
+TEST_F(MetricVarTest, SetPercentileIndividually) { test_set_percentile({20, 50, 10, 25, 16}); }
+
+TEST_F(MetricVarTest, SetPercentileRepeatedly) { test_set_percentile(5, 100); }
+
+#define TEST_METRIC_VAR_AUTO_LATENCY(unit_abbr, factor)                                            \
+    do {                                                                                           \
+        auto start_time_ns = dsn_now_ns();                                                         \
+        uint64_t actual_latency_ns = 0;                                                            \
+        {                                                                                          \
+            METRIC_VAR_AUTO_LATENCY(test_replica_percentile_int64_##unit_abbr,                     \
+                                    start_time_ns,                                                 \
+                                    [&actual_latency_ns](uint64_t latency) mutable {               \
+                                        actual_latency_ns = latency * factor;                      \
+                                    });                                                            \
+        }                                                                                          \
+                                                                                                   \
+        uint64_t expected_latency_ns = dsn_now_ns() - start_time_ns;                               \
+        ASSERT_GE(expected_latency_ns, actual_latency_ns);                                         \
+        EXPECT_LT(expected_latency_ns - actual_latency_ns, 1000 * 1000);                           \
+    } while (0)
+
+TEST_F(MetricVarTest, AutoLatencyNanoSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ns, 1); }
+
+TEST_F(MetricVarTest, AutoLatencyMicroSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(us, 1000); }
+
+TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms, 1000 * 1000); }
+
+TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); }
+
 } // namespace dsn
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index 04f3616d4..18f665535 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -27,8 +27,12 @@
 
 #include <chrono>
 #include <cstdio>
+#include <type_traits>
 
-#include "string_view.h"
+#include "runtime/api_layer1.h"
+#include "utils/fmt_logging.h"
+#include "utils/ports.h"
+#include "utils/string_view.h"
 
 namespace dsn {
 namespace utils {
@@ -126,5 +130,28 @@ inline int64_t hh_mm_today_to_unix_sec(string_view hhmm_of_day)
     return get_unix_sec_today_midnight() + sec_of_day;
 }
 
+class chronograph
+{
+public:
+    chronograph() : chronograph(dsn_now_ns()) {}
+    chronograph(uint64_t start_time_ns) : _start_time_ns(start_time_ns) {}
+    ~chronograph() = default;
+
+    inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
+
+    inline uint64_t duration_ns()
+    {
+        auto now = dsn_now_ns();
+        CHECK_GE(now, _start_time_ns);
+
+        return now - _start_time_ns;
+    }
+
+private:
+    uint64_t _start_time_ns;
+
+    DISALLOW_COPY_AND_ASSIGN(chronograph);
+};
+
 } // namespace utils
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org


[incubator-pegasus] 03/04: feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 3210f71ae3a82e0601ed82d079282fcf8713a5ed
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Mar 7 14:32:56 2023 +0800

    feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
    
    This PR is to migrate replica-level metrics of `pegasus_server_impl` to new framework
    (https://github.com/apache/incubator-pegasus/issues/1333). Since there are many replica-level
    metrics in `pegasus_server_impl`, this PR is part 1 for this migration and other metrics (all
    of which are gauges) will be migrated in the later PRs.
---
 src/server/pegasus_server_impl.cpp           | 250 ++++++++++++++-------------
 src/server/pegasus_server_impl.h             |  32 ++--
 src/server/pegasus_server_impl_init.cpp      | 126 +++++++-------
 src/server/pegasus_write_service_impl.h      |   5 +-
 src/server/rocksdb_wrapper.cpp               |   6 +-
 src/server/rocksdb_wrapper.h                 |   2 +-
 src/server/test/pegasus_server_impl_test.cpp |   4 +-
 src/utils/metrics.h                          |  16 +-
 src/utils/test/metrics_test.cpp              |   2 -
 src/utils/time_utils.h                       |   2 +-
 10 files changed, 242 insertions(+), 203 deletions(-)

diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index c655d7273..2bbcc6507 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -272,13 +272,50 @@ int pegasus_server_impl::on_batched_write_requests(int64_t decree,
     return _server_write->on_batched_write_requests(requests, count, decree, timestamp);
 }
 
+// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be declared as static or
+// with anonymous namespace.
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const dsn::blob &hash_key,
+                                           const dsn::blob &sort_key) const
+{
+    LOG_ERROR_PREFIX("rocksdb data expired for {} from {}: hash_key = \"{}\", sort_key = \"{}\"",
+                     op,
+                     addr,
+                     pegasus::utils::c_escape_string(hash_key),
+                     pegasus::utils::c_escape_string(sort_key));
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const dsn::blob &key) const
+{
+    dsn::blob hash_key, sort_key;
+    pegasus_restore_key(key, hash_key, sort_key);
+    log_expired_data(op, addr, hash_key, sort_key);
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+                                           const dsn::rpc_address &addr,
+                                           const rocksdb::Slice &key) const
+{
+    dsn::blob raw_key(key.data(), 0, key.size());
+    log_expired_data(op, addr, raw_key);
+}
+
+#define LOG_EXPIRED_DATA_IF_VERBOSE(...)                                                           \
+    do {                                                                                           \
+        if (dsn_unlikely(FLAGS_rocksdb_verbose_log)) {                                             \
+            log_expired_data(__FUNCTION__, rpc.remote_address(), ##__VA_ARGS__);                   \
+        }                                                                                          \
+    } while (0)
+
 void pegasus_server_impl::on_get(get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_get_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(get_requests);
 
-    const auto &key = rpc.request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -290,21 +327,20 @@ void pegasus_server_impl::on_get(get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(get_latency_ns);
+
+    const auto &key = rpc.request();
     rocksdb::Slice skey(key.data(), key.length());
     std::string value;
     rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value);
 
     if (status.ok()) {
         if (check_if_record_expired(utils::epoch_now(), value)) {
-            _pfc_recent_expire_count->increment();
-            if (FLAGS_rocksdb_verbose_log) {
-                LOG_ERROR_PREFIX("rocksdb data expired for get from {}", rpc.remote_address());
-            }
+            METRIC_VAR_INCREMENT(read_expired_values);
+            LOG_EXPIRED_DATA_IF_VERBOSE(key);
             status = rocksdb::Status::NotFound();
         }
-    }
-
-    if (!status.ok()) {
+    } else {
         if (FLAGS_rocksdb_verbose_log) {
             ::dsn::blob hash_key, sort_key;
             pegasus_restore_key(key, hash_key, sort_key);
@@ -327,7 +363,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     usleep(10 * 1000);
 #endif
 
-    uint64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(get_latency_ns);
     if (is_get_abnormal(time_used, value.size())) {
         ::dsn::blob hash_key, sort_key;
         pegasus_restore_key(key, hash_key, sort_key);
@@ -340,7 +376,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
                            status.ToString(),
                            value.size(),
                            time_used);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
     resp.error = status.code();
@@ -349,17 +385,14 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     }
 
     _cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
-    _pfc_get_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_multi_get_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(multi_get_requests);
 
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -371,6 +404,10 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     if (!is_filter_type_supported(request.sort_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for multi_get from {}: sort key filter type {} not supported",
@@ -378,7 +415,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-        _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
         return;
     }
 
@@ -465,8 +501,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             }
             resp.error = rocksdb::Status::kOk;
             _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-            _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
-
             return;
         }
 
@@ -669,7 +703,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
         for (int i = 0; i < keys.size(); i++) {
             rocksdb::Status &status = statuses[i];
             std::string &value = values[i];
-            // print log
             if (!status.ok()) {
                 if (FLAGS_rocksdb_verbose_log) {
                     LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", "
@@ -683,41 +716,38 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
                                      rpc.remote_address(),
                                      status.ToString());
                 }
-            }
-            // check ttl
-            if (status.ok()) {
-                uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
-                if (expire_ts > 0 && expire_ts <= epoch_now) {
-                    expire_count++;
-                    if (FLAGS_rocksdb_verbose_log) {
-                        LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}",
-                                         rpc.remote_address());
-                    }
-                    status = rocksdb::Status::NotFound();
-                }
-            }
-            // extract value
-            if (status.ok()) {
-                // check if exceed limit
-                if (count >= max_kv_count || size >= max_kv_size) {
-                    exceed_limit = true;
-                    break;
-                }
-                ::dsn::apps::key_value kv;
-                kv.key = request.sort_keys[i];
-                if (!request.no_value) {
-                    pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+
+                if (status.IsNotFound()) {
+                    continue;
                 }
-                count++;
-                size += kv.key.length() + kv.value.length();
-                resp.kvs.emplace_back(std::move(kv));
-            }
-            // if error occurred
-            if (!status.ok() && !status.IsNotFound()) {
+
                 error_occurred = true;
                 final_status = status;
                 break;
             }
+
+            // check ttl
+            if (check_if_record_expired(epoch_now, value)) {
+                expire_count++;
+                LOG_EXPIRED_DATA_IF_VERBOSE(request.hash_key, request.sort_keys[i]);
+                status = rocksdb::Status::NotFound();
+                continue;
+            }
+
+            // check if exceed limit
+            if (dsn_unlikely(count >= max_kv_count || size >= max_kv_size)) {
+                exceed_limit = true;
+                break;
+            }
+
+            ::dsn::apps::key_value kv;
+            kv.key = request.sort_keys[i];
+            if (!request.no_value) {
+                pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+            }
+            count++;
+            size += kv.key.length() + kv.value.length();
+            resp.kvs.emplace_back(std::move(kv));
         }
 
         if (error_occurred) {
@@ -735,7 +765,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
     usleep(10 * 1000);
 #endif
 
-    uint64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(multi_get_latency_ns);
     if (is_multi_get_abnormal(time_used, size, iteration_count)) {
         LOG_WARNING_PREFIX(
             "rocksdb abnormal multi_get from {}: hash_key = {}, "
@@ -761,25 +791,20 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
             expire_count,
             filter_count,
             time_used);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
-    if (filter_count > 0) {
-        _pfc_recent_filter_count->add(filter_count);
-    }
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+    METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
 
     _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
-    _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_batch_get_qps->increment();
-    int64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(batch_get_requests);
 
     auto &response = rpc.response();
     response.app_id = _gpid.get_app_id();
@@ -792,13 +817,14 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns);
+
     const auto &request = rpc.request();
     if (request.keys.empty()) {
         response.error = rocksdb::Status::kInvalidArgument;
         LOG_ERROR_PREFIX("Invalid argument for batch_get from {}: 'keys' field in request is empty",
                          rpc.remote_address().to_string());
         _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
-        _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
         return;
     }
 
@@ -817,6 +843,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
     bool error_occurred = false;
     int64_t total_data_size = 0;
     uint32_t epoch_now = pegasus::utils::epoch_now();
+    uint64_t expire_count = 0;
+
     std::vector<std::string> values;
     std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
     response.data.reserve(request.keys.size());
@@ -832,13 +860,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
 
         if (dsn_likely(status.ok())) {
             if (check_if_record_expired(epoch_now, value)) {
-                if (FLAGS_rocksdb_verbose_log) {
-                    LOG_ERROR_PREFIX(
-                        "rocksdb data expired for batch_get from {}, hash_key = {}, sort_key = {}",
-                        rpc.remote_address().to_string(),
-                        pegasus::utils::c_escape_string(hash_key),
-                        pegasus::utils::c_escape_string(sort_key));
-                }
+                ++expire_count;
+                LOG_EXPIRED_DATA_IF_VERBOSE(hash_key, sort_key);
                 continue;
             }
 
@@ -876,7 +899,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
         response.error = rocksdb::Status::kOk;
     }
 
-    int64_t time_used = dsn_now_ns() - start_time;
+    auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(batch_get_latency_ns);
     if (is_batch_get_abnormal(time_used, total_data_size, request.keys.size())) {
         LOG_WARNING_PREFIX(
             "rocksdb abnormal batch_get from {}: total data size = {}, row count = {}, "
@@ -885,33 +908,36 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
             total_data_size,
             request.keys.size(),
             time_used / 1000);
-        _pfc_recent_abnormal_count->increment();
+        METRIC_VAR_INCREMENT(abnormal_read_requests);
     }
 
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+
     _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
-    _pfc_batch_get_latency->set(time_used);
 }
 
 void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
 {
-    CHECK(_is_open, "");
+    CHECK_TRUE(_is_open);
 
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    METRIC_VAR_INCREMENT(scan_requests);
 
-    const auto &hash_key = rpc.request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
+
     if (!_read_size_throttling_controller->available()) {
         rpc.error() = dsn::ERR_BUSY;
         _counter_recent_read_throttling_reject_count->increment();
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
     // scan
     ::dsn::blob start_key, stop_key;
+    const auto &hash_key = rpc.request();
     pegasus_generate_key(start_key, hash_key, ::dsn::blob());
     pegasus_generate_next_blob(stop_key, hash_key);
     rocksdb::Slice start(start_key.data(), start_key.length());
@@ -933,18 +959,14 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
 
         if (check_if_record_expired(epoch_now, it->value())) {
             expire_count++;
-            if (FLAGS_rocksdb_verbose_log) {
-                LOG_ERROR_PREFIX("rocksdb data expired for sortkey_count from {}",
-                                 rpc.remote_address());
-            }
+            LOG_EXPIRED_DATA_IF_VERBOSE(it->key());
         } else {
             resp.count++;
         }
         it->Next();
     }
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
+
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
 
     resp.error = it->status().code();
     if (!it->status().ok()) {
@@ -970,7 +992,6 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
     }
 
     _cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_ttl(ttl_rpc rpc)
@@ -998,7 +1019,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
     if (status.ok()) {
         expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
         if (check_if_ts_expired(now_ts, expire_ts)) {
-            _pfc_recent_expire_count->increment();
+            METRIC_VAR_INCREMENT(read_expired_values);
             if (FLAGS_rocksdb_verbose_log) {
                 LOG_ERROR_PREFIX("rocksdb data expired for ttl from {}", rpc.remote_address());
             }
@@ -1038,12 +1059,10 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
 
 void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(scan_requests);
 
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -1055,6 +1074,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     if (!is_filter_type_supported(request.hash_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for get_scanner from {}: hash key filter type {} not supported",
@@ -1062,10 +1085,9 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
             request.hash_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
+
     if (!is_filter_type_supported(request.sort_key_filter_type)) {
         LOG_ERROR_PREFIX(
             "invalid argument for get_scanner from {}: sort key filter type {} not supported",
@@ -1073,8 +1095,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
             request.sort_key_filter_type);
         resp.error = rocksdb::Status::kInvalidArgument;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
 
@@ -1130,8 +1150,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         }
         resp.error = rocksdb::Status::kOk;
         _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-        _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
         return;
     }
 
@@ -1284,24 +1302,18 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
         resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
     }
 
-    if (expire_count > 0) {
-        _pfc_recent_expire_count->add(expire_count);
-    }
-    if (filter_count > 0) {
-        _pfc_recent_filter_count->add(filter_count);
-    }
+    METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+    METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
 
     _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_scan(scan_rpc rpc)
 {
-    CHECK(_is_open, "");
-    _pfc_scan_qps->increment();
-    uint64_t start_time = dsn_now_ns();
-    const auto &request = rpc.request();
-    dsn::message_ex *req = rpc.dsn_request();
+    CHECK_TRUE(_is_open);
+
+    METRIC_VAR_INCREMENT(scan_requests);
+
     auto &resp = rpc.response();
     resp.app_id = _gpid.get_app_id();
     resp.partition_index = _gpid.get_partition_index();
@@ -1313,6 +1325,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
         return;
     }
 
+    METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+    const auto &request = rpc.request();
+    dsn::message_ex *req = rpc.dsn_request();
     std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
     if (context) {
         rocksdb::Iterator *it = context->iterator.get();
@@ -1435,18 +1451,14 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
             resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
         }
 
-        if (expire_count > 0) {
-            _pfc_recent_expire_count->add(expire_count);
-        }
-        if (filter_count > 0) {
-            _pfc_recent_filter_count->add(filter_count);
-        }
+        METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+        METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
+
     } else {
         resp.error = rocksdb::Status::Code::kNotFound;
     }
 
     _cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
-    _pfc_scan_latency->set(dsn_now_ns() - start_time);
 }
 
 void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 82e381681..acaf13945 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -411,6 +411,15 @@ private:
 
     dsn::replication::manual_compaction_status::type query_compact_status() const override;
 
+    // Log expired keys for verbose mode.
+    void log_expired_data(const char *op,
+                          const dsn::rpc_address &addr,
+                          const dsn::blob &hash_key,
+                          const dsn::blob &sort_key) const;
+    void log_expired_data(const char *op, const dsn::rpc_address &addr, const dsn::blob &key) const;
+    void
+    log_expired_data(const char *op, const dsn::rpc_address &addr, const rocksdb::Slice &key) const;
+
 private:
     static const std::chrono::seconds kServerStatUpdateTimeSec;
     static const std::string COMPRESSION_HEADER;
@@ -482,20 +491,19 @@ private:
 
     std::shared_ptr<throttling_controller> _read_size_throttling_controller;
 
-    // perf counters
-    ::dsn::perf_counter_wrapper _pfc_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_batch_get_qps;
-    ::dsn::perf_counter_wrapper _pfc_scan_qps;
+    METRIC_VAR_DECLARE_counter(get_requests);
+    METRIC_VAR_DECLARE_counter(multi_get_requests);
+    METRIC_VAR_DECLARE_counter(batch_get_requests);
+    METRIC_VAR_DECLARE_counter(scan_requests);
 
-    ::dsn::perf_counter_wrapper _pfc_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_multi_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_batch_get_latency;
-    ::dsn::perf_counter_wrapper _pfc_scan_latency;
+    METRIC_VAR_DECLARE_percentile_int64(get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns);
 
-    ::dsn::perf_counter_wrapper _pfc_recent_expire_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_filter_count;
-    ::dsn::perf_counter_wrapper _pfc_recent_abnormal_count;
+    METRIC_VAR_DECLARE_counter(read_expired_values);
+    METRIC_VAR_DECLARE_counter(read_filtered_values);
+    METRIC_VAR_DECLARE_counter(abnormal_read_requests);
 
     // rocksdb internal statistics
     // server level
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 29e5180f3..9575035c2 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -31,6 +31,61 @@
 #include "pegasus_server_write.h"
 #include "hotkey_collector.h"
 
+METRIC_DEFINE_counter(replica,
+                      get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      multi_get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      batch_get_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      scan_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of SCAN requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               multi_get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               batch_get_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+                               scan_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency of SCAN requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      read_expired_values,
+                      dsn::metric_unit::kValues,
+                      "The number of expired values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      read_filtered_values,
+                      dsn::metric_unit::kValues,
+                      "The number of filtered values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+                      abnormal_read_requests,
+                      dsn::metric_unit::kRequests,
+                      "The number of abnormal read requests for each replica");
+
 namespace pegasus {
 namespace server {
 
@@ -372,7 +427,18 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
       _last_durable_decree(0),
       _is_checkpointing(false),
       _manual_compact_svc(this),
-      _partition_version(0)
+      _partition_version(0),
+      METRIC_VAR_INIT_replica(get_requests),
+      METRIC_VAR_INIT_replica(multi_get_requests),
+      METRIC_VAR_INIT_replica(batch_get_requests),
+      METRIC_VAR_INIT_replica(scan_requests),
+      METRIC_VAR_INIT_replica(get_latency_ns),
+      METRIC_VAR_INIT_replica(multi_get_latency_ns),
+      METRIC_VAR_INIT_replica(batch_get_latency_ns),
+      METRIC_VAR_INIT_replica(scan_latency_ns),
+      METRIC_VAR_INIT_replica(read_expired_values),
+      METRIC_VAR_INIT_replica(read_filtered_values),
+      METRIC_VAR_INIT_replica(abnormal_read_requests)
 {
     _primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
     _gpid = get_gpid();
@@ -579,64 +645,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
     char name[256];
 
     // register the perf counters
-    snprintf(name, 255, "get_qps@%s", str_gpid.c_str());
-    _pfc_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET request");
-
-    snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str());
-    _pfc_multi_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");
-
-    snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str());
-    _pfc_batch_get_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of BATCH_GET request");
-
-    snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
-    _pfc_scan_qps.init_app_counter(
-        "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
-
-    snprintf(name, 255, "get_latency@%s", str_gpid.c_str());
-    _pfc_get_latency.init_app_counter("app.pegasus",
-                                      name,
-                                      COUNTER_TYPE_NUMBER_PERCENTILES,
-                                      "statistic the latency of GET request");
-
-    snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str());
-    _pfc_multi_get_latency.init_app_counter("app.pegasus",
-                                            name,
-                                            COUNTER_TYPE_NUMBER_PERCENTILES,
-                                            "statistic the latency of MULTI_GET request");
-
-    snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str());
-    _pfc_batch_get_latency.init_app_counter("app.pegasus",
-                                            name,
-                                            COUNTER_TYPE_NUMBER_PERCENTILES,
-                                            "statistic the latency of BATCH_GET request");
-
-    snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
-    _pfc_scan_latency.init_app_counter("app.pegasus",
-                                       name,
-                                       COUNTER_TYPE_NUMBER_PERCENTILES,
-                                       "statistic the latency of SCAN request");
-
-    snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str());
-    _pfc_recent_expire_count.init_app_counter("app.pegasus",
-                                              name,
-                                              COUNTER_TYPE_VOLATILE_NUMBER,
-                                              "statistic the recent expired value read count");
-
-    snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str());
-    _pfc_recent_filter_count.init_app_counter("app.pegasus",
-                                              name,
-                                              COUNTER_TYPE_VOLATILE_NUMBER,
-                                              "statistic the recent filtered value read count");
-
-    snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str());
-    _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
-                                                name,
-                                                COUNTER_TYPE_VOLATILE_NUMBER,
-                                                "statistic the recent abnormal read count");
-
     snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
     _pfc_rdb_sst_count.init_app_counter(
         "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index 16108b852..d1ecf0664 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -88,8 +88,7 @@ public:
     explicit impl(pegasus_server_impl *server)
         : replica_base(server),
           _primary_address(server->_primary_address),
-          _pegasus_data_version(server->_pegasus_data_version),
-          _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+          _pegasus_data_version(server->_pegasus_data_version)
     {
         _rocksdb_wrapper = dsn::make_unique<rocksdb_wrapper>(server);
     }
@@ -689,8 +688,6 @@ private:
     const std::string _primary_address;
     const uint32_t _pegasus_data_version;
 
-    ::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
-
     std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
 
     // for setting update_response.error after committed.
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 4a10a3827..3e95af4ca 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -24,6 +24,8 @@
 #include "pegasus_write_service_impl.h"
 #include "base/pegasus_value_schema.h"
 
+METRIC_DECLARE_counter(read_expired_values);
+
 namespace pegasus {
 namespace server {
 
@@ -33,7 +35,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
       _rd_opts(server->_data_cf_rd_opts),
       _meta_cf(server->_meta_cf),
       _pegasus_data_version(server->_pegasus_data_version),
-      _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+      METRIC_VAR_INIT_replica(read_expired_values),
       _default_ttl(0)
 {
     _write_batch = dsn::make_unique<rocksdb::WriteBatch>();
@@ -55,7 +57,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
         ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
         if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
             ctx->expired = true;
-            _pfc_recent_expire_count->increment();
+            METRIC_VAR_INCREMENT(read_expired_values);
         }
         return rocksdb::Status::kOk;
     } else if (s.IsNotFound()) {
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index 51a46c7ff..a26fae34c 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -81,7 +81,7 @@ private:
     rocksdb::ColumnFamilyHandle *_meta_cf;
 
     const uint32_t _pegasus_data_version;
-    dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+    METRIC_VAR_DECLARE_counter(read_expired_values);
     volatile uint32_t _default_ttl;
 
     friend class rocksdb_wrapper_test;
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index 16671e4dc..e008b7fad 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -54,7 +54,7 @@ public:
             _server->update_app_envs(envs);
 
             // do on_get/on_multi_get operation,
-            long before_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+            auto before_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
             if (!test.is_multi_get) {
                 get_rpc rpc(dsn::make_unique<dsn::blob>(test_key), dsn::apps::RPC_RRDB_RRDB_GET);
                 _server->on_get(rpc);
@@ -67,7 +67,7 @@ public:
                                   dsn::apps::RPC_RRDB_RRDB_MULTI_GET);
                 _server->on_multi_get(rpc);
             }
-            long after_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+            auto after_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
 
             ASSERT_EQ(before_count + test.expect_perf_counter_incr, after_count);
         }
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index f32665f24..f44296ce5 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -157,7 +157,13 @@
 #define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
 
 // Perform increment-related operations on metrics including gauge and counter.
-#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
+    do {                                                                                           \
+        if (x != 0) {                                                                              \
+            _##name->increment_by(x);                                                              \
+        }                                                                                          \
+    } while (0)
+
 #define METRIC_VAR_INCREMENT(name) _##name->increment()
 
 // Perform set() operations on metrics including gauge and percentile.
@@ -168,10 +174,15 @@
 // such as percentile.
 #define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
 
+// Read the current measurement of the metric.
+#define METRIC_VAR_VALUE(name) _##name->value()
+
 // Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
 #define METRIC_VAR_AUTO_LATENCY(name, ...)                                                         \
     dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
 
+#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
+
 namespace dsn {
 
 class metric_prototype;
@@ -589,6 +600,7 @@ enum class metric_unit : size_t
     kMilliSeconds,
     kSeconds,
     kRequests,
+    kValues,
     kInvalidUnit,
 };
 
@@ -1400,6 +1412,8 @@ public:
         }
     }
 
+    inline uint64_t duration_ns() const { return _chrono.duration_ns(); }
+
 private:
     percentile_ptr<int64_t> _percentile;
     utils::chronograph _chrono;
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 9a0d27e86..66cb41c13 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3121,8 +3121,6 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
     EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
 }
 
-#define METRIC_VAR_VALUE(name) _##name->value()
-
 #define TEST_METRIC_VAR_INCREMENT(name)                                                            \
     do {                                                                                           \
         ASSERT_EQ(0, METRIC_VAR_VALUE(name));                                                      \
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index 18f665535..032bd5166 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -139,7 +139,7 @@ public:
 
     inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
 
-    inline uint64_t duration_ns()
+    inline uint64_t duration_ns() const
     {
         auto now = dsn_now_ns();
         CHECK_GE(now, _start_time_ns);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org