You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2022/07/28 03:52:06 UTC

[incubator-pegasus] branch master updated: feat: support to close percentile to prevent from heap-use-after-free error (#1074)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d8451ae feat: support to close percentile to prevent from heap-use-after-free error (#1074)
50d8451ae is described below

commit 50d8451ae90464875f2ce5b85b67c1c3269545af
Author: Dan Wang <em...@126.com>
AuthorDate: Thu Jul 28 11:52:01 2022 +0800

    feat: support to close percentile to prevent from heap-use-after-free error (#1074)
---
 src/rdsn/include/dsn/utility/metrics.h   | 146 ++++++++++++++++++++++++++-----
 src/rdsn/src/utils/metrics.cpp           | 120 +++++++++++++++++++++++--
 src/rdsn/src/utils/test/metrics_test.cpp |   3 +-
 3 files changed, 242 insertions(+), 27 deletions(-)

diff --git a/src/rdsn/include/dsn/utility/metrics.h b/src/rdsn/include/dsn/utility/metrics.h
index 85112c186..1e48ef34e 100644
--- a/src/rdsn/include/dsn/utility/metrics.h
+++ b/src/rdsn/include/dsn/utility/metrics.h
@@ -43,6 +43,7 @@
 #include <dsn/utility/ports.h>
 #include <dsn/utility/singleton.h>
 #include <dsn/utility/string_view.h>
+#include <dsn/utility/synchronize.h>
 
 // A metric library (for details pls see https://github.com/apache/incubator-pegasus/issues/922)
 // inspired by Kudu metrics (https://github.com/apache/kudu/blob/master/src/kudu/util/metrics.h).
@@ -78,9 +79,11 @@
 // Convenient macros are provided to define entity types and metric prototypes.
 #define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name)
 #define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...)                              \
-    ::dsn::gauge_prototype<int64_t> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+    ::dsn::gauge_prototype<int64_t> METRIC_##name(                                                 \
+        {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
 #define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...)                             \
-    ::dsn::gauge_prototype<double> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+    ::dsn::gauge_prototype<double> METRIC_##name(                                                  \
+        {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
 // There are 2 kinds of counters:
 // - `counter` is the general type of counter that is implemented by striped_long_adder, which can
 //   achieve high performance while consuming less memory if it's not updated very frequently.
@@ -89,24 +92,24 @@
 // See also include/dsn/utility/long_adder.h for details.
 #define METRIC_DEFINE_counter(entity_type, name, unit, desc, ...)                                  \
     dsn::counter_prototype<dsn::striped_long_adder, false> METRIC_##name(                          \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__})
 #define METRIC_DEFINE_concurrent_counter(entity_type, name, unit, desc, ...)                       \
     dsn::counter_prototype<dsn::concurrent_long_adder, false> METRIC_##name(                       \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__})
 #define METRIC_DEFINE_volatile_counter(entity_type, name, unit, desc, ...)                         \
     dsn::counter_prototype<dsn::striped_long_adder, true> METRIC_##name(                           \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__})
 #define METRIC_DEFINE_concurrent_volatile_counter(entity_type, name, unit, desc, ...)              \
     dsn::counter_prototype<dsn::concurrent_long_adder, true> METRIC_##name(                        \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__})
 
 // The percentile supports both integral and floating types.
 #define METRIC_DEFINE_percentile_int64(entity_type, name, unit, desc, ...)                         \
     dsn::percentile_prototype<int64_t> METRIC_##name(                                              \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__})
 #define METRIC_DEFINE_percentile_double(entity_type, name, unit, desc, ...)                        \
     dsn::floating_percentile_prototype<double> METRIC_##name(                                      \
-        {#entity_type, #name, unit, desc, ##__VA_ARGS__})
+        {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__})
 
 // The following macros act as forward declarations for entity types and metric prototypes.
 #define METRIC_DECLARE_entity(name) extern ::dsn::metric_entity_prototype METRIC_ENTITY_##name
@@ -168,6 +171,19 @@ private:
 
     ~metric_entity();
 
+    // Close all "closeable" metrics owned by this entity.
+    //
+    // `option` is used to control how the close operations are performed:
+    // * kWait:     close() will be blocked until all of the close operations are finished.
+    // * kNoWait:   once the close requests are issued, close() will return immediately without
+    //              waiting for any close operation to be finished.
+    enum class close_option : int
+    {
+        kWait,
+        kNoWait,
+    };
+    void close(close_option option);
+
     void set_attributes(attr_map &&attrs);
 
     const std::string _id;
@@ -221,6 +237,30 @@ private:
     DISALLOW_COPY_AND_ASSIGN(metric_registry);
 };
 
+// metric_type is needed while metrics are collected to monitoring systems. Generally
+// each monitoring system has its own types of metrics: firstly we should know which
+// type our metric belongs to; then we can know how to "translate" it to the specific
+// monitoring system.
+//
+// On the other hand, it is also needed when some special operation should be done
+// for a metric type. For example, percentile should be closed while it's no longer
+// used.
+enum class metric_type
+{
+    kGauge,
+    kCounter,
+    kVolatileCounter,
+    kPercentile,
+    kInvalidUnit,
+};
+
+ENUM_BEGIN(metric_type, metric_type::kInvalidUnit)
+ENUM_REG(metric_type::kGauge)
+ENUM_REG(metric_type::kCounter)
+ENUM_REG(metric_type::kVolatileCounter)
+ENUM_REG(metric_type::kPercentile)
+ENUM_END(metric_type)
+
 enum class metric_unit
 {
     kNanoSeconds,
@@ -244,6 +284,7 @@ public:
     struct ctor_args
     {
         const string_view entity_type;
+        const metric_type type;
         const string_view name;
         const metric_unit unit;
         const string_view desc;
@@ -251,6 +292,8 @@ public:
 
     string_view entity_type() const { return _args.entity_type; }
 
+    metric_type type() const { return _args.type; }
+
     string_view name() const { return _args.name; }
 
     metric_unit unit() const { return _args.unit; }
@@ -311,6 +354,27 @@ private:
     DISALLOW_COPY_AND_ASSIGN(metric);
 };
 
+// closeable_metric is a metric that implements close() method to execute some necessary close
+// operations before the destructor is invoked. close() will return immediately without waiting
+// for any close operation to be finished, while wait() is used to wait for all of the close
+// operations to be finished.
+//
+// It's guaranteed that close() for each metric will be called before it is destructed. Generally
+// both of close() and wait() are invoked by its manager, namely metric_entity.
+class closeable_metric : public metric
+{
+public:
+    virtual void close() = 0;
+    virtual void wait() = 0;
+
+protected:
+    explicit closeable_metric(const metric_prototype *prototype);
+    virtual ~closeable_metric() = default;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(closeable_metric);
+};
+
 // A gauge is a metric that represents a single numerical value that can arbitrarily go up and
 // down. Usually there are 2 scenarios for a guage.
 //
@@ -522,12 +586,21 @@ const std::set<kth_percentile_type> kAllKthPercentileTypes = get_all_kth_percent
 class percentile_timer
 {
 public:
-    using exec_fn = std::function<void()>;
+    enum class state : int
+    {
+        kRunning,
+        kClosing,
+        kClosed,
+    };
+
+    using on_exec_fn = std::function<void()>;
+    using on_close_fn = std::function<void()>;
 
-    percentile_timer(uint64_t interval_ms, exec_fn exec);
+    percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close);
     ~percentile_timer() = default;
 
-    void cancel() { _timer->cancel(); }
+    void close();
+    void wait();
 
     // Get the initial delay that is randomly generated by `generate_initial_delay_ms()`.
     uint64_t get_initial_delay_ms() const { return _initial_delay_ms; }
@@ -537,11 +610,16 @@ private:
     // same time.
     static uint64_t generate_initial_delay_ms(uint64_t interval_ms);
 
+    void on_close();
+
     void on_timer(const boost::system::error_code &ec);
 
     const uint64_t _initial_delay_ms;
     const uint64_t _interval_ms;
-    const exec_fn _exec;
+    const on_exec_fn _on_exec;
+    const on_close_fn _on_close;
+    std::atomic<state> _state;
+    utils::notify_event _completed;
     std::unique_ptr<boost::asio::deadline_timer> _timer;
 };
 
@@ -561,7 +639,7 @@ private:
 template <typename T,
           typename NthElementFinder = stl_nth_element_finder<T>,
           typename = typename std::enable_if<std::is_arithmetic<T>::value>::type>
-class percentile : public metric
+class percentile : public closeable_metric
 {
 public:
     using value_type = T;
@@ -601,7 +679,7 @@ protected:
                uint64_t interval_ms = 10000,
                const std::set<kth_percentile_type> &kth_percentiles = kAllKthPercentileTypes,
                size_type sample_size = kDefaultSampleSize)
-        : metric(prototype),
+        : closeable_metric(prototype),
           _sample_size(sample_size),
           _last_real_sample_size(0),
           _samples(cacheline_aligned_alloc_array<value_type>(sample_size, value_type{})),
@@ -633,17 +711,22 @@ protected:
         dcheck_gt(interval_ms, 0);
 #endif
 
+        // Increment ref count of percentile, since it will be referenced by timer.
+        // This will extend the lifetime of percentile and prevent from heap-use-after-free
+        // error.
+        //
+        // The ref count will be decremented at the moment when the percentile will
+        // never be used by timer, which means the percentile can be destructed safely.
+        // See on_close() for details which is registered in timer and will be called
+        // back once close() is invoked.
+        add_ref();
         _timer.reset(new percentile_timer(
             interval_ms,
-            std::bind(&percentile<value_type, NthElementFinder>::find_nth_elements, this)));
+            std::bind(&percentile<value_type, NthElementFinder>::find_nth_elements, this),
+            std::bind(&percentile<value_type, NthElementFinder>::on_close, this)));
     }
 
-    virtual ~percentile()
-    {
-        if (_timer) {
-            _timer->cancel();
-        }
-    }
+    virtual ~percentile() = default;
 
 private:
     using nth_container_type = typename NthElementFinder::nth_container_type;
@@ -651,6 +734,27 @@ private:
     friend class metric_entity;
     friend class ref_ptr<percentile<value_type, NthElementFinder>>;
 
+    virtual void close() override
+    {
+        if (_timer) {
+            _timer->close();
+        }
+    }
+
+    virtual void wait() override
+    {
+        if (_timer) {
+            _timer->wait();
+        }
+    }
+
+    void on_close()
+    {
+        // This will be called back after timer is closed, which means the percentile is
+        // no longer needed by timer and can be destructed safely.
+        release_ref();
+    }
+
     void find_nth_elements()
     {
         size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size);
diff --git a/src/rdsn/src/utils/metrics.cpp b/src/rdsn/src/utils/metrics.cpp
index 60749c4fd..d1cad6748 100644
--- a/src/rdsn/src/utils/metrics.cpp
+++ b/src/rdsn/src/utils/metrics.cpp
@@ -38,7 +38,45 @@ metric_entity::metric_entity(const std::string &id, attr_map &&attrs)
 {
 }
 
-metric_entity::~metric_entity() {}
+metric_entity::~metric_entity()
+{
+    // We have to wait for all of close operations to be finished. Waiting for close operations to
+    // be finished in the destructor of each metirc may lead to memory leak detected in ASAN test
+    // for dsn_utils_test, since the percentile is also referenced by shared_io_service which is
+    // still alive without being destructed after ASAN test for dsn_utils_test is finished.
+    close(close_option::kWait);
+}
+
+void metric_entity::close(close_option option)
+{
+    std::lock_guard<std::mutex> guard(_mtx);
+
+    // The reason why each metric is closed in the entity rather than in the destructor of each
+    // metric is that close() for the metric will return immediately without waiting for any close
+    // operation to be finished.
+    //
+    // Thus, to close all metrics owned by an entity, it's more efficient to firstly issue a close
+    // request for all metrics; then, just wait for all of the close operations to be finished.
+    // It's inefficient to wait for each metric to be closed one by one.
+    for (auto &m : _metrics) {
+        if (m.second->prototype()->type() == metric_type::kPercentile) {
+            auto p = down_cast<closeable_metric *>(m.second.get());
+            p->close();
+        }
+    }
+
+    if (option == close_option::kNoWait) {
+        return;
+    }
+
+    // Wait for all of the close operations to be finished.
+    for (auto &m : _metrics) {
+        if (m.second->prototype()->type() == metric_type::kPercentile) {
+            auto p = down_cast<closeable_metric *>(m.second.get());
+            p->wait();
+        }
+    }
+}
 
 metric_entity::attr_map metric_entity::attributes() const
 {
@@ -86,7 +124,25 @@ metric_registry::metric_registry()
     tools::shared_io_service::instance();
 }
 
-metric_registry::~metric_registry() {}
+metric_registry::~metric_registry()
+{
+    std::lock_guard<std::mutex> guard(_mtx);
+
+    // Once the registery is chosen to be destructed, all of the entities and metrics owned by it
+    // will no longer be needed.
+    //
+    // The reason why each entity is closed in the registery rather than in the destructor of each
+    // entity is that close(kNoWait) for the entity will return immediately without waiting for any
+    // close operation to be finished.
+    //
+    // Thus, to close all entities owned by a registery, it's more efficient to firstly issue a
+    // close request for all entities; then, just wait for all of the close operations to be
+    // finished in the destructor of each entity. It's inefficient to wait for each entity to be
+    // closed one by one.
+    for (auto &entity : _entities) {
+        entity.second->close(metric_entity::close_option::kNoWait);
+    }
+}
 
 metric_registry::entity_map metric_registry::entities() const
 {
@@ -120,6 +176,8 @@ metric_prototype::~metric_prototype() {}
 
 metric::metric(const metric_prototype *prototype) : _prototype(prototype) {}
 
+closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {}
+
 uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
 {
     dcheck_gt(interval_ms, 0);
@@ -132,29 +190,81 @@ uint64_t percentile_timer::generate_initial_delay_ms(uint64_t interval_ms)
     return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000;
 }
 
-percentile_timer::percentile_timer(uint64_t interval_ms, exec_fn exec)
+percentile_timer::percentile_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
     : _initial_delay_ms(generate_initial_delay_ms(interval_ms)),
       _interval_ms(interval_ms),
-      _exec(exec),
+      _on_exec(on_exec),
+      _on_close(on_close),
+      _state(state::kRunning),
+      _completed(),
       _timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios))
 {
     _timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms));
     _timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
 }
 
+void percentile_timer::close()
+{
+    // If the timer has already expired when cancel() is called, then the handlers for asynchronous
+    // wait operations will:
+    // * have already been invoked; or
+    // * have been queued for invocation in the near future.
+    //
+    // These handlers can no longer be cancelled, and therefore are passed an error code that
+    // indicates the successful completion of the wait operation. Thus set the state of timer to
+    // kClosing to tell on_timer() that the timer should be closed even if it is not called with
+    // operation_canceled.
+    auto expected_state = state::kRunning;
+    if (_state.compare_exchange_strong(expected_state, state::kClosing)) {
+        _timer->cancel();
+    }
+}
+
+void percentile_timer::wait() { _completed.wait(); }
+
+void percentile_timer::on_close()
+{
+    _on_close();
+    _completed.notify();
+}
+
 void percentile_timer::on_timer(const boost::system::error_code &ec)
 {
+// This macro is defined for the case that handlers for asynchronous wait operations are no
+// longer cancelled. It just checks the internal state atomically (since close() can also be
+// called simultaneously) for kClosing; once it's matched, it will stop the timer by not executing
+// any future handler.
+#define TRY_PROCESS_TIMER_CLOSING()                                                                \
+    do {                                                                                           \
+        auto expected_state = state::kClosing;                                                     \
+        if (_state.compare_exchange_strong(expected_state, state::kClosed)) {                      \
+            on_close();                                                                            \
+            return;                                                                                \
+        }                                                                                          \
+    } while (0)
+
     if (dsn_unlikely(!!ec)) {
         dassert_f(ec == boost::system::errc::operation_canceled,
                   "failed to exec on_timer with an error that cannot be handled: {}",
                   ec.message());
+
+        // Cancel can only be launched by close().
+        auto expected_state = state::kClosing;
+        dassert_f(_state.compare_exchange_strong(expected_state, state::kClosed),
+                  "wrong state for percentile_timer: {}, while expecting closing state",
+                  static_cast<int>(expected_state));
+        on_close();
+
         return;
     }
 
-    _exec();
+    TRY_PROCESS_TIMER_CLOSING();
+    _on_exec();
 
+    TRY_PROCESS_TIMER_CLOSING();
     _timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms));
     _timer->async_wait(std::bind(&percentile_timer::on_timer, this, std::placeholders::_1));
+#undef TRY_PROCESS_TIMER_CLOSING
 }
 
 template <>
diff --git a/src/rdsn/src/utils/test/metrics_test.cpp b/src/rdsn/src/utils/test/metrics_test.cpp
index 5af311732..777cfa0b7 100644
--- a/src/rdsn/src/utils/test/metrics_test.cpp
+++ b/src/rdsn/src/utils/test/metrics_test.cpp
@@ -55,7 +55,8 @@ using my_gauge_ptr = ref_ptr<my_gauge>;
 } // namespace dsn
 
 #define METRIC_DEFINE_my_gauge(entity_type, name, unit, desc, ...)                                 \
-    ::dsn::my_gauge_prototype METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
+    ::dsn::my_gauge_prototype METRIC_##name(                                                       \
+        {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__})
 
 METRIC_DEFINE_entity(my_server);
 METRIC_DEFINE_entity(my_table);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org