You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2022/07/28 03:52:06 UTC
[incubator-pegasus] branch master updated: feat: support to close percentile to prevent from heap-use-after-free error (#1074)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 50d8451ae feat: support to close percentile to prevent from heap-use-after-free error (#1074)
50d8451ae is described below
commit 50d8451ae90464875f2ce5b85b67c1c3269545af
Author: Dan Wang <em...@126.com>
AuthorDate: Thu Jul 28 11:52:01 2022 +0800
feat: support to close percentile to prevent from heap-use-after-free error (#1074)
---
src/rdsn/include/dsn/utility/metrics.h | 146 ++++++++++++++++++++++++++-----
src/rdsn/src/utils/metrics.cpp | 120 +++++++++++++++++++++++--
src/rdsn/src/utils/test/metrics_test.cpp | 3 +-
3 files changed, 242 insertions(+), 27 deletions(-)
diff --git a/src/rdsn/include/dsn/utility/metrics.h b/src/rdsn/include/dsn/utility/metrics.h
index 85112c186..1e48ef34e 100644
--- a/src/rdsn/include/dsn/utility/metrics.h
+++ b/src/rdsn/include/dsn/utility/metrics.h
@@ -43,6 +43,7 @@
#include <dsn/utility/ports.h>
#include <dsn/utility/singleton.h>
#include <dsn/utility/string_view.h>
+#include <dsn/utility/synchronize.h>
// A metric library (for details pls see https://github.com/apache/incubator-pegasus/issues/922)
// inspired by Kudu metrics (https://github.com/apache/kudu/blob/master/src/kudu/util/metrics.h).
@@ -78,9 +79,11 @@
// Convenient macros are provided to define entity types and metric prototypes.
#define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name)
#define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...) \
- ::dsn::gauge_prototype<int64_t> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ ::dsn::gauge_prototype<int64_t> METRIC_##name( \
+ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \
- ::dsn::gauge_prototype<double> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ ::dsn::gauge_prototype<double> METRIC_##name( \
+ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
// There are 2 kinds of counters:
// - `counter` is the general type of counter that is implemented by striped_long_adder, which can
// achieve high performance while consuming less memory if it's not updated very frequently.
@@ -89,24 +92,24 @@
// See also include/dsn/utility/long_adder.h for details.
#define METRIC_DEFINE_counter(entity_type, name, unit, desc, ...) \
dsn::counter_prototype<dsn::striped_long_adder, false> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_concurrent_counter(entity_type, name, unit, desc, ...) \
dsn::counter_prototype<dsn::concurrent_long_adder, false> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_volatile_counter(entity_type, name, unit, desc, ...) \
dsn::counter_prototype<dsn::striped_long_adder, true> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_concurrent_volatile_counter(entity_type, name, unit, desc, ...) \
dsn::counter_prototype<dsn::concurrent_long_adder, true> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__})
// The percentile supports both integral and floating types.
#define METRIC_DEFINE_percentile_int64(entity_type, name, unit, desc, ...) \
dsn::percentile_prototype<int64_t> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_percentile_double(entity_type, name, unit, desc, ...) \
dsn::floating_percentile_prototype<double> METRIC_##name( \
- {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__})
// The following macros act as forward declarations for entity types and metric prototypes.
#define METRIC_DECLARE_entity(name) extern ::dsn::metric_entity_prototype METRIC_ENTITY_##name
@@ -168,6 +171,19 @@ private:
~metric_entity();
+ // Close all "closeable" metrics owned by this entity.
+ //
+ // `option` is used to control how the close operations are performed:
+ // * kWait: close() will be blocked until all of the close operations are finished.
+ // * kNoWait: once the close requests are issued, close() will return immediately without
+ // waiting for any close operation to be finished.
+ enum class close_option : int
+ {
+ kWait,
+ kNoWait,
+ };
+ void close(close_option option);
+
void set_attributes(attr_map &&attrs);
const std::string _id;
@@ -221,6 +237,30 @@ private:
DISALLOW_COPY_AND_ASSIGN(metric_registry);
};
+// metric_type is needed while metrics are collected to monitoring systems. Generally
+// each monitoring system has its own types of metrics: firstly we should know which
+// type our metric belongs to; then we can know how to "translate" it to the specific
+// monitoring system.
+//
+// On the other hand, it is also needed when some special operation should be done
+// for a metric type. For example, percentile should be closed while it's no longer
+// used.
+enum class metric_type
+{
+ kGauge,
+ kCounter,
+ kVolatileCounter,
+ kPercentile,
+ kInvalidUnit,
+};
+
+ENUM_BEGIN(metric_type, metric_type::kInvalidUnit)
+ENUM_REG(metric_type::kGauge)
+ENUM_REG(metric_type::kCounter)
+ENUM_REG(metric_type::kVolatileCounter)
+ENUM_REG(metric_type::kPercentile)
+ENUM_END(metric_type)
+
enum class metric_unit
{
kNanoSeconds,
@@ -244,6 +284,7 @@ public:
struct ctor_args
{
const string_view entity_type;
+ const metric_type type;
const string_view name;
const metric_unit unit;
const string_view desc;
@@ -251,6 +292,8 @@ public:
string_view entity_type() const { return _args.entity_type; }
+ metric_type type() const { return _args.type; }
+
string_view name() const { return _args.name; }
metric_unit unit() const { return _args.unit; }
@@ -311,6 +354,27 @@ private:
DISALLOW_COPY_AND_ASSIGN(metric);
};
+// closeable_metric is a metric that implements close() method to execute some necessary close
+// operations before the destructor is invoked. close() will return immediately without waiting
+// for any close operation to be finished, while wait() is used to wait for all of the close
+// operations to be finished.
+//
+// It's guaranteed that close() for each metric will be called before it is destructed. Generally
+// both of close() and wait() are invoked by its manager, namely metric_entity.
+class closeable_metric : public metric
+{
+public:
+ virtual void close() = 0;
+ virtual void wait() = 0;
+
+protected:
+ explicit closeable_metric(const metric_prototype *prototype);
+ virtual ~closeable_metric() = default;
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(closeable_metric);
+};
+
// A gauge is a metric that represents a single numerical value that can arbitrarily go up and
// down. Usually there are 2 scenarios for a guage.
//
@@ -522,12 +586,21 @@ const std::set<kth_percentile_type> kAllKthPercentileTypes = get_all_kth_percent
class percentile_timer
{
public:
- using exec_fn = std::function<void()>;
+ enum class state : int
+ {
+ kRunning,
+ kClosing,
+ kClosed,
+ };
+
+ using on_exec_fn = std::function<void()>;
+ using on_close_fn = std::function<void()>;
- percentile_timer(uint64_t interval_ms, exec_fn exec);
+ percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close);
~percentile_timer() = default;
- void cancel() { _timer->cancel(); }
+ void close();
+ void wait();
// Get the initial delay that is randomly generated by `generate_initial_delay_ms()`.
uint64_t get_initial_delay_ms() const { return _initial_delay_ms; }
@@ -537,11 +610,16 @@ private:
// same time.
static uint64_t generate_initial_delay_ms(uint64_t interval_ms);
+ void on_close();
+
void on_timer(const boost::system::error_code &ec);
const uint64_t _initial_delay_ms;
const uint64_t _interval_ms;
- const exec_fn _exec;
+ const on_exec_fn _on_exec;
+ const on_close_fn _on_close;
+ std::atomic<state> _state;
+ utils::notify_event _completed;
std::unique_ptr<boost::asio::deadline_timer> _timer;
};
@@ -561,7 +639,7 @@ private:
template <typename T,
typename NthElementFinder = stl_nth_element_finder<T>,
typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
-class percentile : public metric
+class percentile : public closeable_metric
{
public:
using value_type = T;
@@ -601,7 +679,7 @@ protected:
uint64_t interval_ms = 10000,
const std::set<kth_percentile_type> &kth_percentiles = kAllKthPercentileTypes,
size_type sample_size = kDefaultSampleSize)
- : metric(prototype),
+ : closeable_metric(prototype),
_sample_size(sample_size),
_last_real_sample_size(0),
_samples(cacheline_aligned_alloc_array<value_type>(sample_size, value_type{})),
@@ -633,17 +711,22 @@ protected:
dcheck_gt(interval_ms, 0);
#endif
+ // Increment ref count of percentile, since it will be referenced by timer.
+ // This will extend the lifetime of percentile and prevent from heap-use-after-free
+ // error.
+ //
+ // The ref count will be decremented at the moment when the percentile will
+ // never be used by timer, which means the percentile can be destructed safely.
+ // See on_close() for details which is registered in timer and will be called
+ // back once close() is invoked.
+ add_ref();
_timer.reset(new percentile_timer(
interval_ms,
- std::bind(&percentile<value_type, NthElementFinder>::find_nth_elements, this)));
+ std::bind(&percentile<value_type, NthElementFinder>::find_nth_elements, this),
+ std::bind(&percentile<value_type, NthElementFinder>::on_close, this)));
}
- virtual ~percentile()
- {
- if (_timer) {
- _timer->cancel();
- }
- }
+ virtual ~percentile() = default;
private:
using nth_container_type = typename NthElementFinder::nth_container_type;
@@ -651,6 +734,27 @@ private:
friend class metric_entity;
friend class ref_ptr<percentile<value_type, NthElementFinder>>;
+ virtual void close() override
+ {
+ if (_timer) {
+ _timer->close();
+ }
+ }
+
+ virtual void wait() override
+ {
+ if (_timer) {
+ _timer->wait();
+ }
+ }
+
+ void on_close()
+ {
+ // This will be called back after timer is closed, which means the percentile is
+ // no longer needed by timer and can be destructed safely.
+ release_ref();
+ }
+
void find_nth_elements()
{
size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
diff --git a/src/rdsn/src/utils/metrics.cpp b/src/rdsn/src/utils/metrics.cpp
index 60749c4fd..d1cad6748 100644
--- a/src/rdsn/src/utils/metrics.cpp
+++ b/src/rdsn/src/utils/metrics.cpp
@@ -38,7 +38,45 @@ metric_entity::metric_entity(const std::string &id, attr_map &&attrs)
{
}
-metric_entity::~metric_entity() {}
+metric_entity::~metric_entity()
+{
+ // We have to wait for all of close operations to be finished. Waiting for close operations to
+ // be finished in the destructor of each metirc may lead to memory leak detected in ASAN test
+ // for dsn_utils_test, since the percentile is also referenced by shared_io_service which is
+ // still alive without being destructed after ASAN test for dsn_utils_test is finished.
+ close(close_option::kWait);
+}
+
+void metric_entity::close(close_option option)
+{
+ std::lock_guard<std::mutex> guard(_mtx);
+
+ // The reason why each metric is closed in the entity rather than in the destructor of each
+ // metric is that close() for the metric will return immediately without waiting for any close
+ // operation to be finished.
+ //
+ // Thus, to close all metrics owned by an entity, it's more efficient to firstly issue a close
+ // request for all metrics; then, just wait for all of the close operations to be finished.
+ // It's inefficient to wait for each metric to be closed one by one.
+ for (auto &m : _metrics) {
+ if (m.second->prototype()->type() == metric_type::kPercentile) {
+ auto p = down_cast<closeable_metric *>(m.second.get());
+ p->close();
+ }
+ }
+
+ if (option == close_option::kNoWait) {
+ return;
+ }
+
+ // Wait for all of the close operations to be finished.
+ for (auto &m : _metrics) {
+ if (m.second->prototype()->type() == metric_type::kPercentile) {
+ auto p = down_cast<closeable_metric *>(m.second.get());
+ p->wait();
+ }
+ }
+}
metric_entity::attr_map metric_entity::attributes() const
{
@@ -86,7 +124,25 @@ metric_registry::metric_registry()
tools::shared_io_service::instance();
}
-metric_registry::~metric_registry() {}
+metric_registry::~metric_registry()
+{
+ std::lock_guard<std::mutex> guard(_mtx);
+
+ // Once the registery is chosen to be destructed, all of the entities and metrics owned by it
+ // will no longer be needed.
+ //
+ // The reason why each entity is closed in the registery rather than in the destructor of each
+ // entity is that close(kNoWait) for the entity will return immediately without waiting for any
+ // close operation to be finished.
+ //
+ // Thus, to close all entities owned by a registery, it's more efficient to firstly issue a
+ // close request for all entities; then, just wait for all of the close operations to be
+ // finished in the destructor of each entity. It's inefficient to wait for each entity to be
+ // closed one by one.
+ for (auto &entity : _entities) {
+ entity.second->close(metric_entity::close_option::kNoWait);
+ }
+}
metric_registry::entity_map metric_registry::entities() const
{
@@ -120,6 +176,8 @@ metric_prototype::~metric_prototype() {}
metric::metric(const metric_prototype *prototype) : _prototype(prototype) {}
+closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {}
+
uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
{
dcheck_gt(interval_ms, 0);
@@ -132,29 +190,81 @@ uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000;
}
-percentile_timer::percentile_timer(uint64_t interval_ms, exec_fn exec)
+percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
: _initial_delay_ms(generate_initial_delay_ms(interval_ms)),
_interval_ms(interval_ms),
- _exec(exec),
+ _on_exec(on_exec),
+ _on_close(on_close),
+ _state(state::kRunning),
+ _completed(),
_timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios))
{
_timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
}
+void percentile_timer::close()
+{
+ // If the timer has already expired when cancel() is called, then the handlers for asynchronous
+ // wait operations will:
+ // * have already been invoked; or
+ // * have been queued for invocation in the near future.
+ //
+ // These handlers can no longer be cancelled, and therefore are passed an error code that
+ // indicates the successful completion of the wait operation. Thus set the state of timer to
+ // kClosing to tell on_timer() that the timer should be closed even if it is not called with
+ // operation_canceled.
+ auto expected_state = state::kRunning;
+ if (_state.compare_exchange_strong(expected_state, state::kClosing)) {
+ _timer->cancel();
+ }
+}
+
+void percentile_timer::wait() { _completed.wait(); }
+
+void percentile_timer::on_close()
+{
+ _on_close();
+ _completed.notify();
+}
+
void percentile_timer::on_timer(const boost::system::error_code &ec)
{
+// This macro is defined for the case that handlers for asynchronous wait operations are no
+// longer cancelled. It just checks the internal state atomically (since close() can also be
+// called simultaneously) for kClosing; once it's matched, it will stop the timer by not executing
+// any future handler.
+#define TRY_PROCESS_TIMER_CLOSING() \
+ do { \
+ auto expected_state = state::kClosing; \
+ if (_state.compare_exchange_strong(expected_state, state::kClosed)) { \
+ on_close(); \
+ return; \
+ } \
+ } while (0)
+
if (dsn_unlikely(!!ec)) {
dassert_f(ec == boost::system::errc::operation_canceled,
"failed to exec on_timer with an error that cannot be handled: {}",
ec.message());
+
+ // Cancel can only be launched by close().
+ auto expected_state = state::kClosing;
+ dassert_f(_state.compare_exchange_strong(expected_state, state::kClosed),
+ "wrong state for percentile_timer: {}, while expecting closing state",
+ static_cast<int>(expected_state));
+ on_close();
+
return;
}
- _exec();
+ TRY_PROCESS_TIMER_CLOSING();
+ _on_exec();
+ TRY_PROCESS_TIMER_CLOSING();
_timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms));
_timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
+#undef TRY_PROCESS_TIMER_CLOSING
}
template <>
diff --git a/src/rdsn/src/utils/test/metrics_test.cpp b/src/rdsn/src/utils/test/metrics_test.cpp
index 5af311732..777cfa0b7 100644
--- a/src/rdsn/src/utils/test/metrics_test.cpp
+++ b/src/rdsn/src/utils/test/metrics_test.cpp
@@ -55,7 +55,8 @@ using my_gauge_ptr = ref_ptr<my_gauge>;
} // namespace dsn
#define METRIC_DEFINE_my_gauge(entity_type, name, unit, desc, ...) \
- ::dsn::my_gauge_prototype METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+ ::dsn::my_gauge_prototype METRIC_##name( \
+ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
METRIC_DEFINE_entity(my_server);
METRIC_DEFINE_entity(my_table);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org