You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/25 10:55:02 UTC
[incubator-pegasus] 03/28: feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit c2ff93acaf9f510355e71f8473fa9b107ed757be
Author: Dan Wang <wa...@apache.org>
AuthorDate: Tue Mar 7 14:32:56 2023 +0800
feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374)
This PR is to migrate replica-level metrics of `pegasus_server_impl` to new framework
(https://github.com/apache/incubator-pegasus/issues/1333). Since there are many replica-level
metrics in `pegasus_server_impl`, this PR is part 1 for this migration and other metrics (all
of which are gauges) will be migrated in the later PRs.
---
src/server/pegasus_server_impl.cpp | 250 ++++++++++++++-------------
src/server/pegasus_server_impl.h | 32 ++--
src/server/pegasus_server_impl_init.cpp | 126 +++++++-------
src/server/pegasus_write_service_impl.h | 5 +-
src/server/rocksdb_wrapper.cpp | 6 +-
src/server/rocksdb_wrapper.h | 2 +-
src/server/test/pegasus_server_impl_test.cpp | 4 +-
src/utils/metrics.h | 16 +-
src/utils/test/metrics_test.cpp | 2 -
src/utils/time_utils.h | 2 +-
10 files changed, 242 insertions(+), 203 deletions(-)
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index e6e9d48ed..ab6c4dc1c 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -322,13 +322,50 @@ int pegasus_server_impl::on_batched_write_requests(int64_t decree,
return _server_write->on_batched_write_requests(requests, count, decree, timestamp);
}
+// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be declared as static or
+// with anonymous namespace.
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &hash_key,
+ const dsn::blob &sort_key) const
+{
+ LOG_ERROR_PREFIX("rocksdb data expired for {} from {}: hash_key = \"{}\", sort_key = \"{}\"",
+ op,
+ addr,
+ pegasus::utils::c_escape_string(hash_key),
+ pegasus::utils::c_escape_string(sort_key));
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &key) const
+{
+ dsn::blob hash_key, sort_key;
+ pegasus_restore_key(key, hash_key, sort_key);
+ log_expired_data(op, addr, hash_key, sort_key);
+}
+
+void pegasus_server_impl::log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const rocksdb::Slice &key) const
+{
+ dsn::blob raw_key(key.data(), 0, key.size());
+ log_expired_data(op, addr, raw_key);
+}
+
+#define LOG_EXPIRED_DATA_IF_VERBOSE(...) \
+ do { \
+ if (dsn_unlikely(FLAGS_rocksdb_verbose_log)) { \
+ log_expired_data(__FUNCTION__, rpc.remote_address(), ##__VA_ARGS__); \
+ } \
+ } while (0)
+
void pegasus_server_impl::on_get(get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_get_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(get_requests);
- const auto &key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -345,21 +382,20 @@ void pegasus_server_impl::on_get(get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(get_latency_ns);
+
+ const auto &key = rpc.request();
rocksdb::Slice skey(key.data(), key.length());
std::string value;
rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value);
if (status.ok()) {
if (check_if_record_expired(utils::epoch_now(), value)) {
- _pfc_recent_expire_count->increment();
- if (FLAGS_rocksdb_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for get from {}", rpc.remote_address());
- }
+ METRIC_VAR_INCREMENT(read_expired_values);
+ LOG_EXPIRED_DATA_IF_VERBOSE(key);
status = rocksdb::Status::NotFound();
}
- }
-
- if (!status.ok()) {
+ } else {
if (FLAGS_rocksdb_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
@@ -382,7 +418,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
usleep(10 * 1000);
#endif
- uint64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(get_latency_ns);
if (is_get_abnormal(time_used, value.size())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
@@ -395,7 +431,7 @@ void pegasus_server_impl::on_get(get_rpc rpc)
status.ToString(),
value.size(),
time_used);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
resp.error = status.code();
@@ -404,17 +440,14 @@ void pegasus_server_impl::on_get(get_rpc rpc)
}
_cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value);
- _pfc_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_multi_get_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(multi_get_requests);
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -426,6 +459,10 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for multi_get from {}: sort key filter type {} not supported",
@@ -433,7 +470,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
return;
}
@@ -520,8 +556,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -724,7 +758,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
for (int i = 0; i < keys.size(); i++) {
rocksdb::Status &status = statuses[i];
std::string &value = values[i];
- // print log
if (!status.ok()) {
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", "
@@ -738,41 +771,38 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
rpc.remote_address(),
status.ToString());
}
- }
- // check ttl
- if (status.ok()) {
- uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
- if (expire_ts > 0 && expire_ts <= epoch_now) {
- expire_count++;
- if (FLAGS_rocksdb_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}",
- rpc.remote_address());
- }
- status = rocksdb::Status::NotFound();
- }
- }
- // extract value
- if (status.ok()) {
- // check if exceed limit
- if (count >= max_kv_count || size >= max_kv_size) {
- exceed_limit = true;
- break;
- }
- ::dsn::apps::key_value kv;
- kv.key = request.sort_keys[i];
- if (!request.no_value) {
- pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+
+ if (status.IsNotFound()) {
+ continue;
}
- count++;
- size += kv.key.length() + kv.value.length();
- resp.kvs.emplace_back(std::move(kv));
- }
- // if error occurred
- if (!status.ok() && !status.IsNotFound()) {
+
error_occurred = true;
final_status = status;
break;
}
+
+ // check ttl
+ if (check_if_record_expired(epoch_now, value)) {
+ expire_count++;
+ LOG_EXPIRED_DATA_IF_VERBOSE(request.hash_key, request.sort_keys[i]);
+ status = rocksdb::Status::NotFound();
+ continue;
+ }
+
+ // check if exceed limit
+ if (dsn_unlikely(count >= max_kv_count || size >= max_kv_size)) {
+ exceed_limit = true;
+ break;
+ }
+
+ ::dsn::apps::key_value kv;
+ kv.key = request.sort_keys[i];
+ if (!request.no_value) {
+ pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value);
+ }
+ count++;
+ size += kv.key.length() + kv.value.length();
+ resp.kvs.emplace_back(std::move(kv));
}
if (error_occurred) {
@@ -790,7 +820,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
usleep(10 * 1000);
#endif
- uint64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(multi_get_latency_ns);
if (is_multi_get_abnormal(time_used, size, iteration_count)) {
LOG_WARNING_PREFIX(
"rocksdb abnormal multi_get from {}: hash_key = {}, "
@@ -816,25 +846,20 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
expire_count,
filter_count,
time_used);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
_cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs);
- _pfc_multi_get_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_batch_get_qps->increment();
- int64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(batch_get_requests);
auto &response = rpc.response();
response.app_id = _gpid.get_app_id();
@@ -847,13 +872,14 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns);
+
const auto &request = rpc.request();
if (request.keys.empty()) {
response.error = rocksdb::Status::kInvalidArgument;
LOG_ERROR_PREFIX("Invalid argument for batch_get from {}: 'keys' field in request is empty",
rpc.remote_address().to_string());
_cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
- _pfc_batch_get_latency->set(dsn_now_ns() - start_time);
return;
}
@@ -872,6 +898,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
bool error_occurred = false;
int64_t total_data_size = 0;
uint32_t epoch_now = pegasus::utils::epoch_now();
+ uint64_t expire_count = 0;
+
std::vector<std::string> values;
std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values);
response.data.reserve(request.keys.size());
@@ -887,13 +915,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
if (dsn_likely(status.ok())) {
if (check_if_record_expired(epoch_now, value)) {
- if (FLAGS_rocksdb_verbose_log) {
- LOG_ERROR_PREFIX(
- "rocksdb data expired for batch_get from {}, hash_key = {}, sort_key = {}",
- rpc.remote_address().to_string(),
- pegasus::utils::c_escape_string(hash_key),
- pegasus::utils::c_escape_string(sort_key));
- }
+ ++expire_count;
+ LOG_EXPIRED_DATA_IF_VERBOSE(hash_key, sort_key);
continue;
}
@@ -931,7 +954,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
response.error = rocksdb::Status::kOk;
}
- int64_t time_used = dsn_now_ns() - start_time;
+ auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(batch_get_latency_ns);
if (is_batch_get_abnormal(time_used, total_data_size, request.keys.size())) {
LOG_WARNING_PREFIX(
"rocksdb abnormal batch_get from {}: total data size = {}, row count = {}, "
@@ -940,33 +963,36 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc)
total_data_size,
request.keys.size(),
time_used / 1000);
- _pfc_recent_abnormal_count->increment();
+ METRIC_VAR_INCREMENT(abnormal_read_requests);
}
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+
_cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data);
- _pfc_batch_get_latency->set(time_used);
}
void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
{
- CHECK(_is_open, "");
+ CHECK_TRUE(_is_open);
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ METRIC_VAR_INCREMENT(scan_requests);
- const auto &hash_key = rpc.request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
+
if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
_counter_recent_read_throttling_reject_count->increment();
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
// scan
::dsn::blob start_key, stop_key;
+ const auto &hash_key = rpc.request();
pegasus_generate_key(start_key, hash_key, ::dsn::blob());
pegasus_generate_next_blob(stop_key, hash_key);
rocksdb::Slice start(start_key.data(), start_key.length());
@@ -988,18 +1014,14 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
if (check_if_record_expired(epoch_now, it->value())) {
expire_count++;
- if (FLAGS_rocksdb_verbose_log) {
- LOG_ERROR_PREFIX("rocksdb data expired for sortkey_count from {}",
- rpc.remote_address());
- }
+ LOG_EXPIRED_DATA_IF_VERBOSE(it->key());
} else {
resp.count++;
}
it->Next();
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
+
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
resp.error = it->status().code();
if (!it->status().ok()) {
@@ -1025,7 +1047,6 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
}
_cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_ttl(ttl_rpc rpc)
@@ -1053,7 +1074,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
if (status.ok()) {
expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value);
if (check_if_ts_expired(now_ts, expire_ts)) {
- _pfc_recent_expire_count->increment();
+ METRIC_VAR_INCREMENT(read_expired_values);
if (FLAGS_rocksdb_verbose_log) {
LOG_ERROR_PREFIX("rocksdb data expired for ttl from {}", rpc.remote_address());
}
@@ -1093,12 +1114,10 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(scan_requests);
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -1110,6 +1129,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
if (!is_filter_type_supported(request.hash_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: hash key filter type {} not supported",
@@ -1117,10 +1140,9 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.hash_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
+
if (!is_filter_type_supported(request.sort_key_filter_type)) {
LOG_ERROR_PREFIX(
"invalid argument for get_scanner from {}: sort key filter type {} not supported",
@@ -1128,8 +1150,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.sort_key_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -1185,8 +1205,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
}
resp.error = rocksdb::Status::kOk;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
-
return;
}
@@ -1339,24 +1357,18 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_scan(scan_rpc rpc)
{
- CHECK(_is_open, "");
- _pfc_scan_qps->increment();
- uint64_t start_time = dsn_now_ns();
- const auto &request = rpc.request();
- dsn::message_ex *req = rpc.dsn_request();
+ CHECK_TRUE(_is_open);
+
+ METRIC_VAR_INCREMENT(scan_requests);
+
auto &resp = rpc.response();
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
@@ -1368,6 +1380,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
return;
}
+ METRIC_VAR_AUTO_LATENCY(scan_latency_ns);
+
+ const auto &request = rpc.request();
+ dsn::message_ex *req = rpc.dsn_request();
std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id);
if (context) {
rocksdb::Iterator *it = context->iterator.get();
@@ -1490,18 +1506,14 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED;
}
- if (expire_count > 0) {
- _pfc_recent_expire_count->add(expire_count);
- }
- if (filter_count > 0) {
- _pfc_recent_filter_count->add(filter_count);
- }
+ METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count);
+ METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count);
+
} else {
resp.error = rocksdb::Status::Code::kNotFound;
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
- _pfc_scan_latency->set(dsn_now_ns() - start_time);
}
void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index d156acdce..7203d0ce2 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -446,6 +446,15 @@ private:
dsn::replication::manual_compaction_status::type query_compact_status() const override;
+ // Log expired keys for verbose mode.
+ void log_expired_data(const char *op,
+ const dsn::rpc_address &addr,
+ const dsn::blob &hash_key,
+ const dsn::blob &sort_key) const;
+ void log_expired_data(const char *op, const dsn::rpc_address &addr, const dsn::blob &key) const;
+ void
+ log_expired_data(const char *op, const dsn::rpc_address &addr, const rocksdb::Slice &key) const;
+
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
@@ -517,20 +526,19 @@ private:
std::shared_ptr<throttling_controller> _read_size_throttling_controller;
- // perf counters
- ::dsn::perf_counter_wrapper _pfc_get_qps;
- ::dsn::perf_counter_wrapper _pfc_multi_get_qps;
- ::dsn::perf_counter_wrapper _pfc_batch_get_qps;
- ::dsn::perf_counter_wrapper _pfc_scan_qps;
+ METRIC_VAR_DECLARE_counter(get_requests);
+ METRIC_VAR_DECLARE_counter(multi_get_requests);
+ METRIC_VAR_DECLARE_counter(batch_get_requests);
+ METRIC_VAR_DECLARE_counter(scan_requests);
- ::dsn::perf_counter_wrapper _pfc_get_latency;
- ::dsn::perf_counter_wrapper _pfc_multi_get_latency;
- ::dsn::perf_counter_wrapper _pfc_batch_get_latency;
- ::dsn::perf_counter_wrapper _pfc_scan_latency;
+ METRIC_VAR_DECLARE_percentile_int64(get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns);
+ METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns);
- ::dsn::perf_counter_wrapper _pfc_recent_expire_count;
- ::dsn::perf_counter_wrapper _pfc_recent_filter_count;
- ::dsn::perf_counter_wrapper _pfc_recent_abnormal_count;
+ METRIC_VAR_DECLARE_counter(read_expired_values);
+ METRIC_VAR_DECLARE_counter(read_filtered_values);
+ METRIC_VAR_DECLARE_counter(abnormal_read_requests);
// rocksdb internal statistics
// server level
diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp
index 896f8ab94..27aebdfab 100644
--- a/src/server/pegasus_server_impl_init.cpp
+++ b/src/server/pegasus_server_impl_init.cpp
@@ -62,6 +62,61 @@ class replica;
} // namespace replication
} // namespace dsn
+METRIC_DEFINE_counter(replica,
+ get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ multi_get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ batch_get_requests,
+ dsn::metric_unit::kRequests,
+ "The number of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ scan_requests,
+ dsn::metric_unit::kRequests,
+ "The number of SCAN requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ multi_get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of MULTI_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ batch_get_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of BATCH_GET requests for each replica");
+
+METRIC_DEFINE_percentile_int64(replica,
+ scan_latency_ns,
+ dsn::metric_unit::kNanoSeconds,
+ "The latency of SCAN requests for each replica");
+
+METRIC_DEFINE_counter(replica,
+ read_expired_values,
+ dsn::metric_unit::kValues,
+ "The number of expired values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+ read_filtered_values,
+ dsn::metric_unit::kValues,
+ "The number of filtered values read for each replica");
+
+METRIC_DEFINE_counter(replica,
+ abnormal_read_requests,
+ dsn::metric_unit::kRequests,
+ "The number of abnormal read requests for each replica");
+
namespace pegasus {
namespace server {
@@ -403,7 +458,18 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_last_durable_decree(0),
_is_checkpointing(false),
_manual_compact_svc(this),
- _partition_version(0)
+ _partition_version(0),
+ METRIC_VAR_INIT_replica(get_requests),
+ METRIC_VAR_INIT_replica(multi_get_requests),
+ METRIC_VAR_INIT_replica(batch_get_requests),
+ METRIC_VAR_INIT_replica(scan_requests),
+ METRIC_VAR_INIT_replica(get_latency_ns),
+ METRIC_VAR_INIT_replica(multi_get_latency_ns),
+ METRIC_VAR_INIT_replica(batch_get_latency_ns),
+ METRIC_VAR_INIT_replica(scan_latency_ns),
+ METRIC_VAR_INIT_replica(read_expired_values),
+ METRIC_VAR_INIT_replica(read_filtered_values),
+ METRIC_VAR_INIT_replica(abnormal_read_requests)
{
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();
@@ -610,64 +676,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
char name[256];
// register the perf counters
- snprintf(name, 255, "get_qps@%s", str_gpid.c_str());
- _pfc_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET request");
-
- snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str());
- _pfc_multi_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request");
-
- snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str());
- _pfc_batch_get_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of BATCH_GET request");
-
- snprintf(name, 255, "scan_qps@%s", str_gpid.c_str());
- _pfc_scan_qps.init_app_counter(
- "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request");
-
- snprintf(name, 255, "get_latency@%s", str_gpid.c_str());
- _pfc_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of GET request");
-
- snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str());
- _pfc_multi_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of MULTI_GET request");
-
- snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str());
- _pfc_batch_get_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of BATCH_GET request");
-
- snprintf(name, 255, "scan_latency@%s", str_gpid.c_str());
- _pfc_scan_latency.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_NUMBER_PERCENTILES,
- "statistic the latency of SCAN request");
-
- snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str());
- _pfc_recent_expire_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent expired value read count");
-
- snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str());
- _pfc_recent_filter_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent filtered value read count");
-
- snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str());
- _pfc_recent_abnormal_count.init_app_counter("app.pegasus",
- name,
- COUNTER_TYPE_VOLATILE_NUMBER,
- "statistic the recent abnormal read count");
-
snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str());
_pfc_rdb_sst_count.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files");
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index b75b379a3..344c89560 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -89,8 +89,7 @@ public:
explicit impl(pegasus_server_impl *server)
: replica_base(server),
_primary_address(server->_primary_address),
- _pegasus_data_version(server->_pegasus_data_version),
- _pfc_recent_expire_count(server->_pfc_recent_expire_count)
+ _pegasus_data_version(server->_pegasus_data_version)
{
_rocksdb_wrapper = std::make_unique<rocksdb_wrapper>(server);
}
@@ -689,8 +688,6 @@ private:
const std::string _primary_address;
const uint32_t _pegasus_data_version;
- ::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
-
std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
// for setting update_response.error after committed.
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index 64d37d9be..08fe8aeca 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -39,6 +39,8 @@
#include "utils/fmt_logging.h"
#include "utils/ports.h"
+METRIC_DECLARE_counter(read_expired_values);
+
namespace pegasus {
namespace server {
@@ -54,7 +56,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
_rd_opts(server->_data_cf_rd_opts),
_meta_cf(server->_meta_cf),
_pegasus_data_version(server->_pegasus_data_version),
- _pfc_recent_expire_count(server->_pfc_recent_expire_count),
+ METRIC_VAR_INIT_replica(read_expired_values),
_default_ttl(0)
{
_write_batch = std::make_unique<rocksdb::WriteBatch>();
@@ -76,7 +78,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx)
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
- _pfc_recent_expire_count->increment();
+ METRIC_VAR_INCREMENT(read_expired_values);
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index e3c713512..fef30a629 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -87,7 +87,7 @@ private:
rocksdb::ColumnFamilyHandle *_meta_cf;
const uint32_t _pegasus_data_version;
- dsn::perf_counter_wrapper &_pfc_recent_expire_count;
+ METRIC_VAR_DECLARE_counter(read_expired_values);
volatile uint32_t _default_ttl;
friend class rocksdb_wrapper_test;
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index 1363d8054..075193fef 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -77,7 +77,7 @@ public:
_server->update_app_envs(envs);
// do on_get/on_multi_get operation,
- long before_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+ auto before_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
if (!test.is_multi_get) {
get_rpc rpc(std::make_unique<dsn::blob>(test_key), dsn::apps::RPC_RRDB_RRDB_GET);
_server->on_get(rpc);
@@ -90,7 +90,7 @@ public:
dsn::apps::RPC_RRDB_RRDB_MULTI_GET);
_server->on_multi_get(rpc);
}
- long after_count = _server->_pfc_recent_abnormal_count->get_integer_value();
+ auto after_count = _server->METRIC_VAR_VALUE(abnormal_read_requests);
ASSERT_EQ(before_count + test.expect_perf_counter_incr, after_count);
}
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 2d6da6c0f..da1d056d8 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -165,7 +165,13 @@ class error_code;
#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica)
// Perform increment-related operations on metrics including gauge and counter.
-#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x)
+#define METRIC_VAR_INCREMENT_BY(name, x) \
+ do { \
+ if (x != 0) { \
+ _##name->increment_by(x); \
+ } \
+ } while (0)
+
#define METRIC_VAR_INCREMENT(name) _##name->increment()
// Perform set() operations on metrics including gauge and percentile.
@@ -176,10 +182,15 @@ class error_code;
// such as percentile.
#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
+// Read the current measurement of the metric.
+#define METRIC_VAR_VALUE(name) _##name->value()
+
// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
#define METRIC_VAR_AUTO_LATENCY(name, ...) \
dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__)
+#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
+
namespace dsn {
class metric; // IWYU pragma: keep
class metric_entity_prototype; // IWYU pragma: keep
@@ -598,6 +609,7 @@ enum class metric_unit : size_t
kMilliSeconds,
kSeconds,
kRequests,
+ kValues,
kInvalidUnit,
};
@@ -1409,6 +1421,8 @@ public:
}
}
+ inline uint64_t duration_ns() const { return _chrono.duration_ns(); }
+
private:
percentile_ptr<int64_t> _percentile;
utils::chronograph _chrono;
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 670af36b7..9388a9a92 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3134,8 +3134,6 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
}
-#define METRIC_VAR_VALUE(name) _##name->value()
-
#define TEST_METRIC_VAR_INCREMENT(name) \
do { \
ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h
index dc8d6e1ee..c3bf30e6c 100644
--- a/src/utils/time_utils.h
+++ b/src/utils/time_utils.h
@@ -143,7 +143,7 @@ public:
inline void reset_start_time() { _start_time_ns = dsn_now_ns(); }
- inline uint64_t duration_ns()
+ inline uint64_t duration_ns() const
{
auto now = dsn_now_ns();
CHECK_GE(now, _start_time_ns);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org