You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/05 07:28:13 UTC

[incubator-pegasus] 18/23: feat(new_metrics): migrate replica-level metrics for pegasus_manual_compact_service (#1443)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 142c44c0896e391e2240e8e8b0f8c7a2cb9e5ebf
Author: Dan Wang <wa...@apache.org>
AuthorDate: Mon Apr 17 21:38:23 2023 +0800

    feat(new_metrics): migrate replica-level metrics for pegasus_manual_compact_service (#1443)
    
    https://github.com/apache/incubator-pegasus/issues/1441
    
    In perf counters, all of the 2 metrics of `pegasus_manual_compact_service`
    are about the numbers of tasks of rocksdb manual compaction: one is  the
    number of current queued tasks, while another is the number of current
    running tasks. Both metrics are server-level.
    
    They would become replica-level after migrating to the new metrics, based
    on which server-level ones could also be achieved. A convenient class
    `auto_count` is also provided to increment gauge that will be decremented
    automatically at the end of the scope.
---
 src/server/pegasus_manual_compact_service.cpp | 37 ++++++++---------
 src/server/pegasus_manual_compact_service.h   |  6 +--
 src/utils/metrics.h                           | 57 ++++++++++++++++++++++-----
 src/utils/test/metrics_test.cpp               | 39 ++++++++++++++++++
 4 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp
index 22a40afcb..146db8f72 100644
--- a/src/server/pegasus_manual_compact_service.cpp
+++ b/src/server/pegasus_manual_compact_service.cpp
@@ -29,16 +29,27 @@
 #include "base/pegasus_const.h"
 #include "common/replication.codes.h"
 #include "pegasus_server_impl.h"
-#include "perf_counter/perf_counter.h"
 #include "runtime/api_layer1.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task_code.h"
+#include "utils/autoref_ptr.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/string_conv.h"
+#include "utils/string_view.h"
 #include "utils/strings.h"
 #include "utils/time_utils.h"
 
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_manual_compact_queued_tasks,
+                          dsn::metric_unit::kTasks,
+                          "The number of current queued tasks of rocksdb manual compaction");
+
+METRIC_DEFINE_gauge_int64(replica,
+                          rdb_manual_compact_running_tasks,
+                          dsn::metric_unit::kTasks,
+                          "The number of current running tasks of rocksdb manual compaction");
+
 namespace pegasus {
 namespace server {
 
@@ -58,17 +69,10 @@ pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_im
       _manual_compact_enqueue_time_ms(0),
       _manual_compact_start_running_time_ms(0),
       _manual_compact_last_finish_time_ms(0),
-      _manual_compact_last_time_used_ms(0)
+      _manual_compact_last_time_used_ms(0),
+      METRIC_VAR_INIT_replica(rdb_manual_compact_queued_tasks),
+      METRIC_VAR_INIT_replica(rdb_manual_compact_running_tasks)
 {
-    _pfc_manual_compact_enqueue_count.init_app_counter("app.pegasus",
-                                                       "manual.compact.enqueue.count",
-                                                       COUNTER_TYPE_NUMBER,
-                                                       "current manual compact in queue count");
-
-    _pfc_manual_compact_running_count.init_app_counter("app.pegasus",
-                                                       "manual.compact.running.count",
-                                                       COUNTER_TYPE_NUMBER,
-                                                       "current manual compact running count");
 }
 
 void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
@@ -106,9 +110,9 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed(
         rocksdb::CompactRangeOptions options;
         extract_manual_compact_opts(envs, compact_rule, options);
 
-        _pfc_manual_compact_enqueue_count->increment();
+        METRIC_VAR_INCREMENT(rdb_manual_compact_queued_tasks);
         dsn::tasking::enqueue(LPC_MANUAL_COMPACT, &_app->_tracker, [this, options]() {
-            _pfc_manual_compact_enqueue_count->decrement();
+            METRIC_VAR_DECREMENT(rdb_manual_compact_queued_tasks);
             manual_compact(options);
         });
     } else {
@@ -295,9 +299,8 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
     }
 
     // if current running count exceeds the limit, it would not to be started.
-    _pfc_manual_compact_running_count->increment();
-    if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) {
-        _pfc_manual_compact_running_count->decrement();
+    METRIC_VAR_AUTO_COUNT(rdb_manual_compact_running_tasks);
+    if (METRIC_VAR_VALUE(rdb_manual_compact_running_tasks) > _max_concurrent_running_count) {
         LOG_INFO_PREFIX("ignored compact because exceed max_concurrent_running_count({})",
                         _max_concurrent_running_count.load());
         _manual_compact_enqueue_time_ms.store(0);
@@ -307,8 +310,6 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
     uint64_t start = begin_manual_compact();
     uint64_t finish = _app->do_manual_compact(options);
     end_manual_compact(start, finish);
-
-    _pfc_manual_compact_running_count->decrement();
 }
 
 uint64_t pegasus_manual_compact_service::begin_manual_compact()
diff --git a/src/server/pegasus_manual_compact_service.h b/src/server/pegasus_manual_compact_service.h
index c47bbb5ec..d57343de2 100644
--- a/src/server/pegasus_manual_compact_service.h
+++ b/src/server/pegasus_manual_compact_service.h
@@ -25,8 +25,8 @@
 #include <string>
 
 #include "metadata_types.h"
-#include "perf_counter/perf_counter_wrapper.h"
 #include "replica/replica_base.h"
+#include "utils/metrics.h"
 
 namespace rocksdb {
 struct CompactRangeOptions;
@@ -97,8 +97,8 @@ private:
     std::atomic<uint64_t> _manual_compact_last_finish_time_ms;
     std::atomic<uint64_t> _manual_compact_last_time_used_ms;
 
-    ::dsn::perf_counter_wrapper _pfc_manual_compact_enqueue_count;
-    ::dsn::perf_counter_wrapper _pfc_manual_compact_running_count;
+    METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_queued_tasks);
+    METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_running_tasks);
 };
 
 } // namespace server
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index bdc0ccd50..8b9d396a9 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -170,7 +170,7 @@ class error_code;
 #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__)
 
-// Perform increment-related operations on metrics including gauge and counter.
+// Perform increment-related operations on gauges and counters.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
     do {                                                                                           \
         const auto v = (x);                                                                        \
@@ -179,9 +179,13 @@ class error_code;
         }                                                                                          \
     } while (0)
 
+// Perform increment() operations on gauges and counters.
 #define METRIC_VAR_INCREMENT(name) _##name->increment()
 
-// Perform set() operations on metrics including gauge and percentile.
+// Perform decrement() operations on gauges.
+#define METRIC_VAR_DECREMENT(name) _##name->decrement()
+
+// Perform set() operations on gauges and percentiles.
 //
 // There are 2 kinds of invocations of set() for a metric:
 // * set(val): set a single value for a metric, such as gauge, percentile;
@@ -189,7 +193,7 @@ class error_code;
 // such as percentile.
 #define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
 
-// Read the current measurement of the metric.
+// Read the current measurement of gauges and counters.
 #define METRIC_VAR_VALUE(name) _##name->value()
 
 // Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
@@ -198,6 +202,10 @@ class error_code;
 
 #define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
 
+// Convenient macro that is used to increment/decrement gauge automatically in current scope.
+#define METRIC_VAR_AUTO_COUNT(name, ...)                                                           \
+    dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__)
+
 #define METRIC_DEFINE_INCREMENT_BY(name)                                                           \
     void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
 
@@ -650,6 +658,7 @@ enum class metric_unit : size_t
     kWrites,
     kChanges,
     kOperations,
+    kTasks,
     kDisconnections,
     kServers,
     kInvalidUnit,
@@ -1433,22 +1442,22 @@ using floating_percentile_prototype =
 class auto_latency
 {
 public:
-    auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {}
+    auto_latency(const percentile_ptr<int64_t> &p) : _percentile(p) {}
 
-    auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback)
-        : _percentile(percentile), _callback(std::move(callback))
+    auto_latency(const percentile_ptr<int64_t> &p, std::function<void(uint64_t)> callback)
+        : _percentile(p), _callback(std::move(callback))
     {
     }
 
-    auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns)
-        : _percentile(percentile), _chrono(start_time_ns)
+    auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns)
+        : _percentile(p), _chrono(start_time_ns)
     {
     }
 
-    auto_latency(const percentile_ptr<int64_t> &percentile,
+    auto_latency(const percentile_ptr<int64_t> &p,
                  uint64_t start_time_ns,
                  std::function<void(uint64_t)> callback)
-        : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback))
+        : _percentile(p), _chrono(start_time_ns), _callback(std::move(callback))
     {
     }
 
@@ -1473,6 +1482,34 @@ private:
     DISALLOW_COPY_AND_ASSIGN(auto_latency);
 };
 
+// Increment gauge and decrement it automatically at the end of the scope.
+class auto_count
+{
+public:
+    auto_count(const gauge_ptr<int64_t> &g) : _gauge(g) { _gauge->increment(); }
+
+    auto_count(const gauge_ptr<int64_t> &g, std::function<void()> callback)
+        : _gauge(g), _callback(std::move(callback))
+    {
+        _gauge->increment();
+    }
+
+    ~auto_count()
+    {
+        if (_callback) {
+            _callback();
+        }
+
+        _gauge->decrement();
+    }
+
+private:
+    gauge_ptr<int64_t> _gauge;
+    std::function<void()> _callback;
+
+    DISALLOW_COPY_AND_ASSIGN(auto_count);
+};
+
 } // namespace dsn
 
 // Since server_metric_entity() will be called in macros such as METRIC_VAR_INIT_server(), its
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 9388a9a92..5197ec4b6 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3096,6 +3096,8 @@ protected:
     void test_set_percentile(const std::vector<int64_t> &expected_samples);
     void test_set_percentile(size_t n, int64_t val);
 
+    void test_auto_count();
+
     const metric_entity_ptr _my_replica_metric_entity;
     METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64);
     METRIC_VAR_DECLARE_counter(test_replica_counter);
@@ -3134,6 +3136,19 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
     EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
 }
 
+void MetricVarTest::test_auto_count()
+{
+    ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+    {
+        METRIC_VAR_AUTO_COUNT(test_replica_gauge_int64, [this]() {
+            ASSERT_EQ(1, METRIC_VAR_VALUE(test_replica_gauge_int64));
+        });
+    }
+
+    ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+}
+
 #define TEST_METRIC_VAR_INCREMENT(name)                                                            \
     do {                                                                                           \
         ASSERT_EQ(0, METRIC_VAR_VALUE(name));                                                      \
@@ -3155,6 +3170,28 @@ TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_g
 
 TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); }
 
+#define TEST_METRIC_VAR_DECREMENT(name)                                                            \
+    do {                                                                                           \
+        ASSERT_EQ(0, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT_BY(name, 11);                                                         \
+        ASSERT_EQ(11, METRIC_VAR_VALUE(name));                                                     \
+                                                                                                   \
+        METRIC_VAR_DECREMENT(name);                                                                \
+        ASSERT_EQ(10, METRIC_VAR_VALUE(name));                                                     \
+                                                                                                   \
+        METRIC_VAR_DECREMENT(name);                                                                \
+        ASSERT_EQ(9, METRIC_VAR_VALUE(name));                                                      \
+                                                                                                   \
+        METRIC_VAR_INCREMENT(name);                                                                \
+        ASSERT_EQ(10, METRIC_VAR_VALUE(name));                                                     \
+                                                                                                   \
+        METRIC_VAR_DECREMENT(name);                                                                \
+        ASSERT_EQ(9, METRIC_VAR_VALUE(name));                                                      \
+    } while (0);
+
+TEST_F(MetricVarTest, DecrementGauge) { TEST_METRIC_VAR_DECREMENT(test_replica_gauge_int64); }
+
 TEST_F(MetricVarTest, SetGauge)
 {
     ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
@@ -3195,4 +3232,6 @@ TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms
 
 TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); }
 
+TEST_F(MetricVarTest, AutoCount) { ASSERT_NO_FATAL_FAILURE(test_auto_count()); }
+
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org