You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2022/11/02 11:38:28 UTC
[incubator-pegasus] branch master updated: feat(new_metrics): take snapshot of each metric as json format (#1219)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 20aa6e842 feat(new_metrics): take snapshot of each metric as json format (#1219)
20aa6e842 is described below
commit 20aa6e8420003b95eb690b17b75ebfe805ad2a7d
Author: Dan Wang <wa...@apache.org>
AuthorDate: Wed Nov 2 19:38:23 2022 +0800
feat(new_metrics): take snapshot of each metric as json format (#1219)
---
src/utils/metrics.h | 102 +++++++++++++++-
src/utils/test/metrics_test.cpp | 255 +++++++++++++++++++++++++++++++++++++++-
2 files changed, 352 insertions(+), 5 deletions(-)
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 8ce7478a7..c2082befe 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -33,11 +33,12 @@
#include <boost/asio/deadline_timer.hpp>
#include "api_utilities.h"
-#include "fmt_logging.h"
#include "alloc.h"
#include "autoref_ptr.h"
#include "casts.h"
+#include "common/json_helper.h"
#include "enum_helper.h"
+#include "fmt_logging.h"
#include "long_adder.h"
#include "nth_element.h"
#include "ports.h"
@@ -344,6 +345,9 @@ class metric : public ref_counter
public:
const metric_prototype *prototype() const { return _prototype; }
+ // Take snapshot of each metric to collect current values as json format.
+ virtual void take_snapshot(dsn::json::JsonWriter &writer) = 0;
+
protected:
explicit metric(const metric_prototype *prototype);
virtual ~metric() = default;
@@ -393,6 +397,27 @@ public:
value_type value() const { return _value.load(std::memory_order_relaxed); }
+ // The snapshot collected has following json format:
+ // {
+ // "name": "<metric_name>",
+ // "value": ...
+ // }
+ // where "name" is the name of the gauge in string type, and "value" is just current value
+ // of the gauge fetched by `value()`, in numeric types (i.e. integral or floating-point type,
+ // determined by `value_type`).
+ void take_snapshot(json::JsonWriter &writer) override
+ {
+ writer.StartObject();
+
+ writer.Key("name");
+ json::json_encode(writer, prototype()->name().data());
+
+ writer.Key("value");
+ json::json_encode(writer, value());
+
+ writer.EndObject();
+ }
+
void set(const value_type &val) { _value.store(val, std::memory_order_relaxed); }
template <typename Int = value_type,
@@ -493,6 +518,26 @@ public:
return _adder.fetch_and_reset();
}
+ // The snapshot collected has following json format:
+ // {
+ // "name": "<metric_name>",
+ // "value": ...
+ // }
+ // where "name" is the name of the counter in string type, and "value" is just current value
+ // of the counter fetched by `value()`, in integral type (namely int64_t).
+ void take_snapshot(json::JsonWriter &writer) override
+ {
+ writer.StartObject();
+
+ writer.Key("name");
+ json::json_encode(writer, prototype()->name().data());
+
+ writer.Key("value");
+ json::json_encode(writer, value());
+
+ writer.EndObject();
+ }
+
// NOTICE: x MUST be a non-negative integer.
void increment_by(int64_t x)
{
@@ -558,11 +603,18 @@ ENUM_REG(kth_percentile_type::P99)
ENUM_REG(kth_percentile_type::P999)
ENUM_END(kth_percentile_type)
-const std::vector<double> kKthDecimals = {0.5, 0.9, 0.95, 0.99, 0.999};
+struct kth_percentile
+{
+ std::string name;
+ double decimal;
+};
+
+const std::vector<kth_percentile> kAllKthPercentiles = {
+ {"p50", 0.5}, {"p90", 0.9}, {"p95", 0.95}, {"p99", 0.99}, {"p999", 0.999}};
inline size_t kth_percentile_to_nth_index(size_t size, size_t kth_index)
{
- auto decimal = kKthDecimals[kth_index];
+ auto decimal = kAllKthPercentiles[kth_index].decimal;
// Since the kth percentile is the value that is greater than k percent of the data values after
// ranking them (https://people.richland.edu/james/ictcm/2001/descriptive/helpposition.html),
// compute the nth index by size * decimal rather than size * decimal - 1.
@@ -577,6 +629,13 @@ inline size_t kth_percentile_to_nth_index(size_t size, kth_percentile_type type)
std::set<kth_percentile_type> get_all_kth_percentile_types();
const std::set<kth_percentile_type> kAllKthPercentileTypes = get_all_kth_percentile_types();
+inline std::string kth_percentile_to_name(const kth_percentile_type &type)
+{
+ auto index = static_cast<size_t>(type);
+ CHECK_LT(index, kAllKthPercentiles.size());
+ return kAllKthPercentiles[index].name;
+}
+
// `percentile_timer` is a timer class that encapsulates the details how each percentile is
// computed periodically.
//
@@ -662,10 +721,40 @@ public:
const auto index = static_cast<size_t>(type);
CHECK_LT(index, static_cast<size_t>(kth_percentile_type::COUNT));
- val = _full_nth_elements[index].load(std::memory_order_relaxed);
+ val = value(index);
return _kth_percentile_bitset.test(index);
}
+ // The snapshot collected has following json format:
+ // {
+ // "name": "<metric_name>",
+ // "p50": ...,
+ // "p90": ...,
+ // "p95": ...,
+ // ...
+ // }
+ // where "name" is the name of the percentile in string type, with each configured kth
+ // percentile followed, such as "p50", "p90", "p95", etc. All of them are in numeric types
+ // (i.e. integral or floating-point type, determined by `value_type`).
+ void take_snapshot(json::JsonWriter &writer) override
+ {
+ writer.StartObject();
+
+ writer.Key("name");
+ json::json_encode(writer, prototype()->name().data());
+
+ for (size_t i = 0; i < static_cast<size_t>(kth_percentile_type::COUNT); ++i) {
+ if (!_kth_percentile_bitset.test(i)) {
+ continue;
+ }
+
+ writer.Key(kAllKthPercentiles[i].name.c_str());
+ json::json_encode(writer, value(i));
+ }
+
+ writer.EndObject();
+ }
+
bool timer_enabled() const { return !!_timer; }
uint64_t get_initial_delay_ms() const
@@ -759,6 +848,11 @@ private:
release_ref();
}
+ value_type value(size_t index) const
+ {
+ return _full_nth_elements[index].load(std::memory_order_relaxed);
+ }
+
void find_nth_elements()
{
size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 2237083a7..ec3a080cd 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -16,15 +16,16 @@
// under the License.
#include "utils/metrics.h"
-#include "utils/rand.h"
#include <chrono>
+#include <sstream>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include "percentile_utils.h"
+#include "utils/rand.h"
namespace dsn {
@@ -33,6 +34,8 @@ class my_gauge : public metric
public:
int64_t value() { return _value; }
+ virtual void take_snapshot(json::JsonWriter &) override {}
+
protected:
explicit my_gauge(const metric_prototype *prototype) : metric(prototype), _value(0) {}
@@ -903,4 +906,254 @@ TEST(metrics_test, percentile_double)
floating_checker<value_type>>(METRIC_test_percentile_double);
}
+std::string take_snapshot_and_get_json_string(metric *m)
+{
+ std::stringstream out;
+ rapidjson::OStreamWrapper wrapper(out);
+ json::JsonWriter writer(wrapper);
+
+ m->take_snapshot(writer);
+
+ return out.str();
+}
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+using metric_value_map = std::map<std::string, T>;
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+void check_and_extract_metric_value_map_from_json_string(const std::string &json_string,
+ const std::string &metric_name,
+ const bool is_integral,
+ metric_value_map<T> &value_map)
+{
+ rapidjson::Document doc;
+ rapidjson::ParseResult result = doc.Parse(json_string.c_str());
+ ASSERT_FALSE(result.IsError());
+
+ ASSERT_TRUE(doc.IsObject());
+ for (const auto &elem : doc.GetObject()) {
+ ASSERT_TRUE(elem.name.IsString());
+
+ if (elem.value.IsString()) {
+ ASSERT_STREQ(elem.name.GetString(), "name");
+
+ ASSERT_STREQ(elem.value.GetString(), metric_name.c_str());
+ } else {
+ T value;
+ if (is_integral) {
+ ASSERT_TRUE(elem.value.IsInt64());
+ value = elem.value.GetInt64();
+ } else {
+ ASSERT_TRUE(elem.value.IsDouble());
+ value = elem.value.GetDouble();
+ }
+ value_map[elem.name.GetString()] = value;
+ }
+ }
+}
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+void generate_metric_value_map(metric *my_metric,
+ const bool is_integral,
+ metric_value_map<T> &value_map)
+{
+ auto json_string = take_snapshot_and_get_json_string(my_metric);
+ check_and_extract_metric_value_map_from_json_string(
+ json_string, my_metric->prototype()->name().data(), is_integral, value_map);
+}
+
+template <typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
+void compare_integral_metric_value_map(const metric_value_map<T> &actual_value_map,
+ const metric_value_map<T> &expected_value_map)
+{
+ ASSERT_EQ(actual_value_map, expected_value_map);
+}
+
+template <typename T, typename = typename std::enable_if<std::is_floating_point<T>::value>::type>
+void compare_floating_metric_value_map(const metric_value_map<T> &actual_value_map,
+ const metric_value_map<T> &expected_value_map)
+{
+ ASSERT_EQ(actual_value_map.size(), expected_value_map.size());
+
+ auto actual_iter = actual_value_map.begin();
+ auto expected_iter = expected_value_map.begin();
+ for (; actual_iter != actual_value_map.end() && expected_iter != expected_value_map.end();
+ ++actual_iter, ++expected_iter) {
+ ASSERT_EQ(actual_iter->first, expected_iter->first);
+ ASSERT_DOUBLE_EQ(actual_iter->second, expected_iter->second);
+ }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE( \
+ metric_prototype, updater, value_type, is_integral, value_map_comparator) \
+ do { \
+ auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id); \
+ auto my_metric = metric_prototype.instantiate(my_server_entity); \
+ my_metric->updater(test.expected_value); \
+ \
+ const metric_value_map<value_type> expected_value_map = {{"value", test.expected_value}}; \
+ \
+ metric_value_map<value_type> actual_value_map; \
+ generate_metric_value_map(my_metric.get(), is_integral, actual_value_map); \
+ \
+ value_map_comparator(actual_value_map, expected_value_map); \
+ } while (0)
+
+TEST(metrics_test, take_snapshot_gauge_int64)
+{
+ struct test_case
+ {
+ std::string entity_id;
+ int64_t expected_value;
+ } tests[]{{"server_60", 5}};
+
+ for (const auto &test : tests) {
+ TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(
+ METRIC_test_gauge_int64, set, int64_t, true, compare_integral_metric_value_map);
+ }
+}
+
+TEST(metrics_test, take_snapshot_gauge_double)
+{
+ struct test_case
+ {
+ std::string entity_id;
+ double expected_value;
+ } tests[]{{"server_60", 6.789}};
+
+ for (const auto &test : tests) {
+ TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(
+ METRIC_test_gauge_double, set, double, false, compare_floating_metric_value_map);
+ }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_COUNTER(metric_prototype) \
+ do { \
+ TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE( \
+ metric_prototype, increment_by, int64_t, true, compare_integral_metric_value_map); \
+ } while (0)
+
+#define RUN_CASES_WITH_COUNTER_SNAPSHOT(metric_prototype) \
+ do { \
+ struct test_case \
+ { \
+ std::string entity_id; \
+ int64_t expected_value; \
+ } tests[]{{"server_60", 10}}; \
+ \
+ for (const auto &test : tests) { \
+ TEST_METRIC_SNAPSHOT_WITH_COUNTER(metric_prototype); \
+ } \
+ } while (0)
+
+TEST(metrics_test, take_snapshot_counter) { RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_counter); }
+
+TEST(metrics_test, take_snapshot_concurrent_counter)
+{
+ RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_concurrent_counter);
+}
+
+TEST(metrics_test, take_snapshot_volatile_counter)
+{
+ RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_volatile_counter);
+}
+
+TEST(metrics_test, take_snapshot_concurrent_volatile_counter)
+{
+ RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_concurrent_volatile_counter);
+}
+
+template <typename MetricType, typename CaseGenerator>
+void generate_metric_value_map(MetricType *my_metric,
+ CaseGenerator &generator,
+ const uint64_t interval_ms,
+ const uint64_t exec_ms,
+ const std::set<kth_percentile_type> &kth_percentiles,
+ metric_value_map<typename MetricType::value_type> &value_map)
+{
+ using value_type = typename MetricType::value_type;
+
+ std::vector<value_type> data;
+ std::vector<value_type> values;
+ generator(data, values);
+ ASSERT_EQ(kth_percentiles.size(), values.size());
+
+ for (const auto &elem : data) {
+ my_metric->set(elem);
+ }
+
+ // Wait a while in order that computations for all percentiles can be finished.
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(my_metric->get_initial_delay_ms() + interval_ms + exec_ms));
+
+ auto value = values.begin();
+ for (const auto &type : kth_percentiles) {
+ auto name = kth_percentile_to_name(type);
+ value_map[name] = *value++;
+ }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_PERCENTILE( \
+ metric_prototype, case_generator, is_integral, value_map_comparator) \
+ do { \
+ using value_type = typename case_generator::value_type; \
+ \
+ auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id); \
+ auto my_metric = metric_prototype.instantiate( \
+ my_server_entity, test.interval_ms, test.kth_percentiles, test.sample_size); \
+ \
+ case_generator generator(test.data_size, \
+ value_type() /* initial_value */, \
+ 5 /* range_size */, \
+ test.kth_percentiles); \
+ \
+ metric_value_map<value_type> expected_value_map; \
+ generate_metric_value_map(my_metric.get(), \
+ generator, \
+ test.interval_ms, \
+ test.exec_ms, \
+ test.kth_percentiles, \
+ expected_value_map); \
+ \
+ metric_value_map<value_type> actual_value_map; \
+ generate_metric_value_map(my_metric.get(), is_integral, actual_value_map); \
+ \
+ value_map_comparator(actual_value_map, expected_value_map); \
+ } while (0)
+
+#define RUN_CASES_WITH_PERCENTILE_SNAPSHOT( \
+ metric_prototype, case_generator, is_integral, value_map_comparator) \
+ do { \
+ struct test_case \
+ { \
+ std::string entity_id; \
+ uint64_t interval_ms; \
+ std::set<kth_percentile_type> kth_percentiles; \
+ size_t sample_size; \
+ size_t data_size; \
+ uint64_t exec_ms; \
+ } tests[]{{"server_60", 50, kAllKthPercentileTypes, 4096, 4096, 10}}; \
+ \
+ for (const auto &test : tests) { \
+ TEST_METRIC_SNAPSHOT_WITH_PERCENTILE( \
+ metric_prototype, case_generator, is_integral, value_map_comparator); \
+ } \
+ } while (0)
+
+TEST(metrics_test, take_snapshot_percentile_int64)
+{
+ RUN_CASES_WITH_PERCENTILE_SNAPSHOT(METRIC_test_percentile_int64,
+ integral_percentile_case_generator<int64_t>,
+ true,
+ compare_integral_metric_value_map);
+}
+
+TEST(metrics_test, take_snapshot_percentile_double)
+{
+ RUN_CASES_WITH_PERCENTILE_SNAPSHOT(METRIC_test_percentile_double,
+ floating_percentile_case_generator<double>,
+ false,
+ compare_floating_metric_value_map);
+}
+
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org