You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2022/11/02 11:38:28 UTC

[incubator-pegasus] branch master updated: feat(new_metrics): take snapshot of each metric as json format (#1219)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 20aa6e842 feat(new_metrics): take snapshot of each metric as json format (#1219)
20aa6e842 is described below

commit 20aa6e8420003b95eb690b17b75ebfe805ad2a7d
Author: Dan Wang <wa...@apache.org>
AuthorDate: Wed Nov 2 19:38:23 2022 +0800

    feat(new_metrics): take snapshot of each metric as json format (#1219)
---
 src/utils/metrics.h             | 102 +++++++++++++++-
 src/utils/test/metrics_test.cpp | 255 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 352 insertions(+), 5 deletions(-)

diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 8ce7478a7..c2082befe 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -33,11 +33,12 @@
 #include <boost/asio/deadline_timer.hpp>
 
 #include "api_utilities.h"
-#include "fmt_logging.h"
 #include "alloc.h"
 #include "autoref_ptr.h"
 #include "casts.h"
+#include "common/json_helper.h"
 #include "enum_helper.h"
+#include "fmt_logging.h"
 #include "long_adder.h"
 #include "nth_element.h"
 #include "ports.h"
@@ -344,6 +345,9 @@ class metric : public ref_counter
 public:
     const metric_prototype *prototype() const { return _prototype; }
 
+    // Take snapshot of each metric to collect current values as json format.
+    virtual void take_snapshot(dsn::json::JsonWriter &writer) = 0;
+
 protected:
     explicit metric(const metric_prototype *prototype);
     virtual ~metric() = default;
@@ -393,6 +397,27 @@ public:
 
     value_type value() const { return _value.load(std::memory_order_relaxed); }
 
+    // The snapshot collected has following json format:
+    // {
+    //     "name": "<metric_name>",
+    //     "value": ...
+    // }
+    // where "name" is the name of the gauge in string type, and "value" is just current value
+    // of the gauge fetched by `value()`, in numeric types (i.e. integral or floating-point type,
+    // determined by `value_type`).
+    void take_snapshot(json::JsonWriter &writer) override
+    {
+        writer.StartObject();
+
+        writer.Key("name");
+        json::json_encode(writer, prototype()->name().data());
+
+        writer.Key("value");
+        json::json_encode(writer, value());
+
+        writer.EndObject();
+    }
+
     void set(const value_type &val) { _value.store(val, std::memory_order_relaxed); }
 
     template <typename Int = value_type,
@@ -493,6 +518,26 @@ public:
         return _adder.fetch_and_reset();
     }
 
+    // The snapshot collected has following json format:
+    // {
+    //     "name": "<metric_name>",
+    //     "value": ...
+    // }
+    // where "name" is the name of the counter in string type, and "value" is just current value
+    // of the counter fetched by `value()`, in integral type (namely int64_t).
+    void take_snapshot(json::JsonWriter &writer) override
+    {
+        writer.StartObject();
+
+        writer.Key("name");
+        json::json_encode(writer, prototype()->name().data());
+
+        writer.Key("value");
+        json::json_encode(writer, value());
+
+        writer.EndObject();
+    }
+
     // NOTICE: x MUST be a non-negative integer.
     void increment_by(int64_t x)
     {
@@ -558,11 +603,18 @@ ENUM_REG(kth_percentile_type::P99)
 ENUM_REG(kth_percentile_type::P999)
 ENUM_END(kth_percentile_type)
 
-const std::vector<double> kKthDecimals = {0.5, 0.9, 0.95, 0.99, 0.999};
+struct kth_percentile
+{
+    std::string name;
+    double decimal;
+};
+
+const std::vector<kth_percentile> kAllKthPercentiles = {
+    {"p50", 0.5}, {"p90", 0.9}, {"p95", 0.95}, {"p99", 0.99}, {"p999", 0.999}};
 
 inline size_t kth_percentile_to_nth_index(size_t size, size_t kth_index)
 {
-    auto decimal = kKthDecimals[kth_index];
+    auto decimal = kAllKthPercentiles[kth_index].decimal;
     // Since the kth percentile is the value that is greater than k percent of the data values after
     // ranking them (https://people.richland.edu/james/ictcm/2001/descriptive/helpposition.html),
     // compute the nth index by size * decimal rather than size * decimal - 1.
@@ -577,6 +629,13 @@ inline size_t kth_percentile_to_nth_index(size_t size, kth_percentile_type type)
 std::set<kth_percentile_type> get_all_kth_percentile_types();
 const std::set<kth_percentile_type> kAllKthPercentileTypes = get_all_kth_percentile_types();
 
+inline std::string kth_percentile_to_name(const kth_percentile_type &type)
+{
+    auto index = static_cast<size_t>(type);
+    CHECK_LT(index, kAllKthPercentiles.size());
+    return kAllKthPercentiles[index].name;
+}
+
 // `percentile_timer` is a timer class that encapsulates the details how each percentile is
 // computed periodically.
 //
@@ -662,10 +721,40 @@ public:
         const auto index = static_cast<size_t>(type);
         CHECK_LT(index, static_cast<size_t>(kth_percentile_type::COUNT));
 
-        val = _full_nth_elements[index].load(std::memory_order_relaxed);
+        val = value(index);
         return _kth_percentile_bitset.test(index);
     }
 
+    // The snapshot collected has following json format:
+    // {
+    //     "name": "<metric_name>",
+    //     "p50": ...,
+    //     "p90": ...,
+    //     "p95": ...,
+    //     ...
+    // }
+    // where "name" is the name of the percentile in string type, with each configured kth
+    // percentile followed, such as "p50", "p90", "p95", etc. All of them are in numeric types
+    // (i.e. integral or floating-point type, determined by `value_type`).
+    void take_snapshot(json::JsonWriter &writer) override
+    {
+        writer.StartObject();
+
+        writer.Key("name");
+        json::json_encode(writer, prototype()->name().data());
+
+        for (size_t i = 0; i < static_cast<size_t>(kth_percentile_type::COUNT); ++i) {
+            if (!_kth_percentile_bitset.test(i)) {
+                continue;
+            }
+
+            writer.Key(kAllKthPercentiles[i].name.c_str());
+            json::json_encode(writer, value(i));
+        }
+
+        writer.EndObject();
+    }
+
     bool timer_enabled() const { return !!_timer; }
 
     uint64_t get_initial_delay_ms() const
@@ -759,6 +848,11 @@ private:
         release_ref();
     }
 
+    value_type value(size_t index) const
+    {
+        return _full_nth_elements[index].load(std::memory_order_relaxed);
+    }
+
     void find_nth_elements()
     {
         size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 2237083a7..ec3a080cd 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -16,15 +16,16 @@
 // under the License.
 
 #include "utils/metrics.h"
-#include "utils/rand.h"
 
 #include <chrono>
+#include <sstream>
 #include <thread>
 #include <vector>
 
 #include <gtest/gtest.h>
 
 #include "percentile_utils.h"
+#include "utils/rand.h"
 
 namespace dsn {
 
@@ -33,6 +34,8 @@ class my_gauge : public metric
 public:
     int64_t value() { return _value; }
 
+    virtual void take_snapshot(json::JsonWriter &) override {}
+
 protected:
     explicit my_gauge(const metric_prototype *prototype) : metric(prototype), _value(0) {}
 
@@ -903,4 +906,254 @@ TEST(metrics_test, percentile_double)
                          floating_checker<value_type>>(METRIC_test_percentile_double);
 }
 
+std::string take_snapshot_and_get_json_string(metric *m)
+{
+    std::stringstream out;
+    rapidjson::OStreamWrapper wrapper(out);
+    json::JsonWriter writer(wrapper);
+
+    m->take_snapshot(writer);
+
+    return out.str();
+}
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+using metric_value_map = std::map<std::string, T>;
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+void check_and_extract_metric_value_map_from_json_string(const std::string &json_string,
+                                                         const std::string &metric_name,
+                                                         const bool is_integral,
+                                                         metric_value_map<T> &value_map)
+{
+    rapidjson::Document doc;
+    rapidjson::ParseResult result = doc.Parse(json_string.c_str());
+    ASSERT_FALSE(result.IsError());
+
+    ASSERT_TRUE(doc.IsObject());
+    for (const auto &elem : doc.GetObject()) {
+        ASSERT_TRUE(elem.name.IsString());
+
+        if (elem.value.IsString()) {
+            ASSERT_STREQ(elem.name.GetString(), "name");
+
+            ASSERT_STREQ(elem.value.GetString(), metric_name.c_str());
+        } else {
+            T value;
+            if (is_integral) {
+                ASSERT_TRUE(elem.value.IsInt64());
+                value = elem.value.GetInt64();
+            } else {
+                ASSERT_TRUE(elem.value.IsDouble());
+                value = elem.value.GetDouble();
+            }
+            value_map[elem.name.GetString()] = value;
+        }
+    }
+}
+
+template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
+void generate_metric_value_map(metric *my_metric,
+                               const bool is_integral,
+                               metric_value_map<T> &value_map)
+{
+    auto json_string = take_snapshot_and_get_json_string(my_metric);
+    check_and_extract_metric_value_map_from_json_string(
+        json_string, my_metric->prototype()->name().data(), is_integral, value_map);
+}
+
+template <typename T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
+void compare_integral_metric_value_map(const metric_value_map<T> &actual_value_map,
+                                       const metric_value_map<T> &expected_value_map)
+{
+    ASSERT_EQ(actual_value_map, expected_value_map);
+}
+
+template <typename T, typename = typename std::enable_if<std::is_floating_point<T>::value>::type>
+void compare_floating_metric_value_map(const metric_value_map<T> &actual_value_map,
+                                       const metric_value_map<T> &expected_value_map)
+{
+    ASSERT_EQ(actual_value_map.size(), expected_value_map.size());
+
+    auto actual_iter = actual_value_map.begin();
+    auto expected_iter = expected_value_map.begin();
+    for (; actual_iter != actual_value_map.end() && expected_iter != expected_value_map.end();
+         ++actual_iter, ++expected_iter) {
+        ASSERT_EQ(actual_iter->first, expected_iter->first);
+        ASSERT_DOUBLE_EQ(actual_iter->second, expected_iter->second);
+    }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(                                                    \
+    metric_prototype, updater, value_type, is_integral, value_map_comparator)                      \
+    do {                                                                                           \
+        auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id);               \
+        auto my_metric = metric_prototype.instantiate(my_server_entity);                           \
+        my_metric->updater(test.expected_value);                                                   \
+                                                                                                   \
+        const metric_value_map<value_type> expected_value_map = {{"value", test.expected_value}};  \
+                                                                                                   \
+        metric_value_map<value_type> actual_value_map;                                             \
+        generate_metric_value_map(my_metric.get(), is_integral, actual_value_map);                 \
+                                                                                                   \
+        value_map_comparator(actual_value_map, expected_value_map);                                \
+    } while (0)
+
+TEST(metrics_test, take_snapshot_gauge_int64)
+{
+    struct test_case
+    {
+        std::string entity_id;
+        int64_t expected_value;
+    } tests[]{{"server_60", 5}};
+
+    for (const auto &test : tests) {
+        TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(
+            METRIC_test_gauge_int64, set, int64_t, true, compare_integral_metric_value_map);
+    }
+}
+
+TEST(metrics_test, take_snapshot_gauge_double)
+{
+    struct test_case
+    {
+        std::string entity_id;
+        double expected_value;
+    } tests[]{{"server_60", 6.789}};
+
+    for (const auto &test : tests) {
+        TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(
+            METRIC_test_gauge_double, set, double, false, compare_floating_metric_value_map);
+    }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_COUNTER(metric_prototype)                                        \
+    do {                                                                                           \
+        TEST_METRIC_SNAPSHOT_WITH_SINGLE_VALUE(                                                    \
+            metric_prototype, increment_by, int64_t, true, compare_integral_metric_value_map);     \
+    } while (0)
+
+#define RUN_CASES_WITH_COUNTER_SNAPSHOT(metric_prototype)                                          \
+    do {                                                                                           \
+        struct test_case                                                                           \
+        {                                                                                          \
+            std::string entity_id;                                                                 \
+            int64_t expected_value;                                                                \
+        } tests[]{{"server_60", 10}};                                                              \
+                                                                                                   \
+        for (const auto &test : tests) {                                                           \
+            TEST_METRIC_SNAPSHOT_WITH_COUNTER(metric_prototype);                                   \
+        }                                                                                          \
+    } while (0)
+
+TEST(metrics_test, take_snapshot_counter) { RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_counter); }
+
+TEST(metrics_test, take_snapshot_concurrent_counter)
+{
+    RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_concurrent_counter);
+}
+
+TEST(metrics_test, take_snapshot_volatile_counter)
+{
+    RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_volatile_counter);
+}
+
+TEST(metrics_test, take_snapshot_concurrent_volatile_counter)
+{
+    RUN_CASES_WITH_COUNTER_SNAPSHOT(METRIC_test_concurrent_volatile_counter);
+}
+
+template <typename MetricType, typename CaseGenerator>
+void generate_metric_value_map(MetricType *my_metric,
+                               CaseGenerator &generator,
+                               const uint64_t interval_ms,
+                               const uint64_t exec_ms,
+                               const std::set<kth_percentile_type> &kth_percentiles,
+                               metric_value_map<typename MetricType::value_type> &value_map)
+{
+    using value_type = typename MetricType::value_type;
+
+    std::vector<value_type> data;
+    std::vector<value_type> values;
+    generator(data, values);
+    ASSERT_EQ(kth_percentiles.size(), values.size());
+
+    for (const auto &elem : data) {
+        my_metric->set(elem);
+    }
+
+    // Wait a while in order that computations for all percentiles can be finished.
+    std::this_thread::sleep_for(
+        std::chrono::milliseconds(my_metric->get_initial_delay_ms() + interval_ms + exec_ms));
+
+    auto value = values.begin();
+    for (const auto &type : kth_percentiles) {
+        auto name = kth_percentile_to_name(type);
+        value_map[name] = *value++;
+    }
+}
+
+#define TEST_METRIC_SNAPSHOT_WITH_PERCENTILE(                                                      \
+    metric_prototype, case_generator, is_integral, value_map_comparator)                           \
+    do {                                                                                           \
+        using value_type = typename case_generator::value_type;                                    \
+                                                                                                   \
+        auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id);               \
+        auto my_metric = metric_prototype.instantiate(                                             \
+            my_server_entity, test.interval_ms, test.kth_percentiles, test.sample_size);           \
+                                                                                                   \
+        case_generator generator(test.data_size,                                                   \
+                                 value_type() /* initial_value */,                                 \
+                                 5 /* range_size */,                                               \
+                                 test.kth_percentiles);                                            \
+                                                                                                   \
+        metric_value_map<value_type> expected_value_map;                                           \
+        generate_metric_value_map(my_metric.get(),                                                 \
+                                  generator,                                                       \
+                                  test.interval_ms,                                                \
+                                  test.exec_ms,                                                    \
+                                  test.kth_percentiles,                                            \
+                                  expected_value_map);                                             \
+                                                                                                   \
+        metric_value_map<value_type> actual_value_map;                                             \
+        generate_metric_value_map(my_metric.get(), is_integral, actual_value_map);                 \
+                                                                                                   \
+        value_map_comparator(actual_value_map, expected_value_map);                                \
+    } while (0)
+
+#define RUN_CASES_WITH_PERCENTILE_SNAPSHOT(                                                        \
+    metric_prototype, case_generator, is_integral, value_map_comparator)                           \
+    do {                                                                                           \
+        struct test_case                                                                           \
+        {                                                                                          \
+            std::string entity_id;                                                                 \
+            uint64_t interval_ms;                                                                  \
+            std::set<kth_percentile_type> kth_percentiles;                                         \
+            size_t sample_size;                                                                    \
+            size_t data_size;                                                                      \
+            uint64_t exec_ms;                                                                      \
+        } tests[]{{"server_60", 50, kAllKthPercentileTypes, 4096, 4096, 10}};                      \
+                                                                                                   \
+        for (const auto &test : tests) {                                                           \
+            TEST_METRIC_SNAPSHOT_WITH_PERCENTILE(                                                  \
+                metric_prototype, case_generator, is_integral, value_map_comparator);              \
+        }                                                                                          \
+    } while (0)
+
+TEST(metrics_test, take_snapshot_percentile_int64)
+{
+    RUN_CASES_WITH_PERCENTILE_SNAPSHOT(METRIC_test_percentile_int64,
+                                       integral_percentile_case_generator<int64_t>,
+                                       true,
+                                       compare_integral_metric_value_map);
+}
+
+TEST(metrics_test, take_snapshot_percentile_double)
+{
+    RUN_CASES_WITH_PERCENTILE_SNAPSHOT(METRIC_test_percentile_double,
+                                       floating_percentile_case_generator<double>,
+                                       false,
+                                       compare_floating_metric_value_map);
+}
+
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org