You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/05 07:28:13 UTC
[incubator-pegasus] 18/23: feat(new_metrics): migrate replica-level metrics for pegasus_manual_compact_service (#1443)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 142c44c0896e391e2240e8e8b0f8c7a2cb9e5ebf
Author: Dan Wang <wa...@apache.org>
AuthorDate: Mon Apr 17 21:38:23 2023 +0800
feat(new_metrics): migrate replica-level metrics for pegasus_manual_compact_service (#1443)
https://github.com/apache/incubator-pegasus/issues/1441
In perf counters, all of the 2 metrics of `pegasus_manual_compact_service`
are about the numbers of tasks of rocksdb manual compaction: one is the
number of current queued tasks, while another is the number of current
running tasks. Both metrics are server-level.
They would become replica-level after migrating to the new metrics, based
on which server-level ones could also be achieved. A convenient class
`auto_count` is also provided to increment gauge that will be decremented
automatically at the end of the scope.
---
src/server/pegasus_manual_compact_service.cpp | 37 ++++++++---------
src/server/pegasus_manual_compact_service.h | 6 +--
src/utils/metrics.h | 57 ++++++++++++++++++++++-----
src/utils/test/metrics_test.cpp | 39 ++++++++++++++++++
4 files changed, 108 insertions(+), 31 deletions(-)
diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp
index 22a40afcb..146db8f72 100644
--- a/src/server/pegasus_manual_compact_service.cpp
+++ b/src/server/pegasus_manual_compact_service.cpp
@@ -29,16 +29,27 @@
#include "base/pegasus_const.h"
#include "common/replication.codes.h"
#include "pegasus_server_impl.h"
-#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task_code.h"
+#include "utils/autoref_ptr.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
+#include "utils/string_view.h"
#include "utils/strings.h"
#include "utils/time_utils.h"
+METRIC_DEFINE_gauge_int64(replica,
+ rdb_manual_compact_queued_tasks,
+ dsn::metric_unit::kTasks,
+ "The number of current queued tasks of rocksdb manual compaction");
+
+METRIC_DEFINE_gauge_int64(replica,
+ rdb_manual_compact_running_tasks,
+ dsn::metric_unit::kTasks,
+ "The number of current running tasks of rocksdb manual compaction");
+
namespace pegasus {
namespace server {
@@ -58,17 +69,10 @@ pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_im
_manual_compact_enqueue_time_ms(0),
_manual_compact_start_running_time_ms(0),
_manual_compact_last_finish_time_ms(0),
- _manual_compact_last_time_used_ms(0)
+ _manual_compact_last_time_used_ms(0),
+ METRIC_VAR_INIT_replica(rdb_manual_compact_queued_tasks),
+ METRIC_VAR_INIT_replica(rdb_manual_compact_running_tasks)
{
- _pfc_manual_compact_enqueue_count.init_app_counter("app.pegasus",
- "manual.compact.enqueue.count",
- COUNTER_TYPE_NUMBER,
- "current manual compact in queue count");
-
- _pfc_manual_compact_running_count.init_app_counter("app.pegasus",
- "manual.compact.running.count",
- COUNTER_TYPE_NUMBER,
- "current manual compact running count");
}
void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
@@ -106,9 +110,9 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed(
rocksdb::CompactRangeOptions options;
extract_manual_compact_opts(envs, compact_rule, options);
- _pfc_manual_compact_enqueue_count->increment();
+ METRIC_VAR_INCREMENT(rdb_manual_compact_queued_tasks);
dsn::tasking::enqueue(LPC_MANUAL_COMPACT, &_app->_tracker, [this, options]() {
- _pfc_manual_compact_enqueue_count->decrement();
+ METRIC_VAR_DECREMENT(rdb_manual_compact_queued_tasks);
manual_compact(options);
});
} else {
@@ -295,9 +299,8 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
}
// if current running count exceeds the limit, it would not to be started.
- _pfc_manual_compact_running_count->increment();
- if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) {
- _pfc_manual_compact_running_count->decrement();
+ METRIC_VAR_AUTO_COUNT(rdb_manual_compact_running_tasks);
+ if (METRIC_VAR_VALUE(rdb_manual_compact_running_tasks) > _max_concurrent_running_count) {
LOG_INFO_PREFIX("ignored compact because exceed max_concurrent_running_count({})",
_max_concurrent_running_count.load());
_manual_compact_enqueue_time_ms.store(0);
@@ -307,8 +310,6 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
end_manual_compact(start, finish);
-
- _pfc_manual_compact_running_count->decrement();
}
uint64_t pegasus_manual_compact_service::begin_manual_compact()
diff --git a/src/server/pegasus_manual_compact_service.h b/src/server/pegasus_manual_compact_service.h
index c47bbb5ec..d57343de2 100644
--- a/src/server/pegasus_manual_compact_service.h
+++ b/src/server/pegasus_manual_compact_service.h
@@ -25,8 +25,8 @@
#include <string>
#include "metadata_types.h"
-#include "perf_counter/perf_counter_wrapper.h"
#include "replica/replica_base.h"
+#include "utils/metrics.h"
namespace rocksdb {
struct CompactRangeOptions;
@@ -97,8 +97,8 @@ private:
std::atomic<uint64_t> _manual_compact_last_finish_time_ms;
std::atomic<uint64_t> _manual_compact_last_time_used_ms;
- ::dsn::perf_counter_wrapper _pfc_manual_compact_enqueue_count;
- ::dsn::perf_counter_wrapper _pfc_manual_compact_running_count;
+ METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_queued_tasks);
+ METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_running_tasks);
};
} // namespace server
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index bdc0ccd50..8b9d396a9 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -170,7 +170,7 @@ class error_code;
#define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
#define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__)
-// Perform increment-related operations on metrics including gauge and counter.
+// Perform increment-related operations on gauges and counters.
#define METRIC_VAR_INCREMENT_BY(name, x) \
do { \
const auto v = (x); \
@@ -179,9 +179,13 @@ class error_code;
} \
} while (0)
+// Perform increment() operations on gauges and counters.
#define METRIC_VAR_INCREMENT(name) _##name->increment()
-// Perform set() operations on metrics including gauge and percentile.
+// Perform decrement() operations on gauges.
+#define METRIC_VAR_DECREMENT(name) _##name->decrement()
+
+// Perform set() operations on gauges and percentiles.
//
// There are 2 kinds of invocations of set() for a metric:
// * set(val): set a single value for a metric, such as gauge, percentile;
@@ -189,7 +193,7 @@ class error_code;
// such as percentile.
#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)
-// Read the current measurement of the metric.
+// Read the current measurement of gauges and counters.
#define METRIC_VAR_VALUE(name) _##name->value()
// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
@@ -198,6 +202,10 @@ class error_code;
#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()
+// Convenient macro that is used to increment/decrement gauge automatically in current scope.
+#define METRIC_VAR_AUTO_COUNT(name, ...) \
+ dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__)
+
#define METRIC_DEFINE_INCREMENT_BY(name) \
void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }
@@ -650,6 +658,7 @@ enum class metric_unit : size_t
kWrites,
kChanges,
kOperations,
+ kTasks,
kDisconnections,
kServers,
kInvalidUnit,
@@ -1433,22 +1442,22 @@ using floating_percentile_prototype =
class auto_latency
{
public:
- auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {}
+ auto_latency(const percentile_ptr<int64_t> &p) : _percentile(p) {}
- auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback)
- : _percentile(percentile), _callback(std::move(callback))
+ auto_latency(const percentile_ptr<int64_t> &p, std::function<void(uint64_t)> callback)
+ : _percentile(p), _callback(std::move(callback))
{
}
- auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns)
- : _percentile(percentile), _chrono(start_time_ns)
+ auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns)
+ : _percentile(p), _chrono(start_time_ns)
{
}
- auto_latency(const percentile_ptr<int64_t> &percentile,
+ auto_latency(const percentile_ptr<int64_t> &p,
uint64_t start_time_ns,
std::function<void(uint64_t)> callback)
- : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback))
+ : _percentile(p), _chrono(start_time_ns), _callback(std::move(callback))
{
}
@@ -1473,6 +1482,34 @@ private:
DISALLOW_COPY_AND_ASSIGN(auto_latency);
};
+// Increment gauge and decrement it automatically at the end of the scope.
+class auto_count
+{
+public:
+ auto_count(const gauge_ptr<int64_t> &g) : _gauge(g) { _gauge->increment(); }
+
+ auto_count(const gauge_ptr<int64_t> &g, std::function<void()> callback)
+ : _gauge(g), _callback(std::move(callback))
+ {
+ _gauge->increment();
+ }
+
+ ~auto_count()
+ {
+ if (_callback) {
+ _callback();
+ }
+
+ _gauge->decrement();
+ }
+
+private:
+ gauge_ptr<int64_t> _gauge;
+ std::function<void()> _callback;
+
+ DISALLOW_COPY_AND_ASSIGN(auto_count);
+};
+
} // namespace dsn
// Since server_metric_entity() will be called in macros such as METRIC_VAR_INIT_server(), its
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index 9388a9a92..5197ec4b6 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -3096,6 +3096,8 @@ protected:
void test_set_percentile(const std::vector<int64_t> &expected_samples);
void test_set_percentile(size_t n, int64_t val);
+ void test_auto_count();
+
const metric_entity_ptr _my_replica_metric_entity;
METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64);
METRIC_VAR_DECLARE_counter(test_replica_counter);
@@ -3134,6 +3136,19 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
}
+void MetricVarTest::test_auto_count()
+{
+ ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+
+ {
+ METRIC_VAR_AUTO_COUNT(test_replica_gauge_int64, [this]() {
+ ASSERT_EQ(1, METRIC_VAR_VALUE(test_replica_gauge_int64));
+ });
+ }
+
+ ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
+}
+
#define TEST_METRIC_VAR_INCREMENT(name) \
do { \
ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
@@ -3155,6 +3170,28 @@ TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_g
TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); }
+#define TEST_METRIC_VAR_DECREMENT(name) \
+ do { \
+ ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT_BY(name, 11); \
+ ASSERT_EQ(11, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_DECREMENT(name); \
+ ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_DECREMENT(name); \
+ ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_INCREMENT(name); \
+ ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \
+ \
+ METRIC_VAR_DECREMENT(name); \
+ ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \
+ } while (0);
+
+TEST_F(MetricVarTest, DecrementGauge) { TEST_METRIC_VAR_DECREMENT(test_replica_gauge_int64); }
+
TEST_F(MetricVarTest, SetGauge)
{
ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
@@ -3195,4 +3232,6 @@ TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms
TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); }
+TEST_F(MetricVarTest, AutoCount) { ASSERT_NO_FATAL_FAILURE(test_auto_count()); }
+
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org