You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/25 10:55:27 UTC

[incubator-pegasus] 28/28: feat(new_metrics): migrate metrics for task queue (#1484)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 06d8cac01adf173bf9828e8bf76d0137c34ff3e4
Author: Dan Wang <wa...@apache.org>
AuthorDate: Thu May 25 15:00:26 2023 +0800

    feat(new_metrics): migrate metrics for task queue (#1484)
    
    https://github.com/apache/incubator-pegasus/issues/1483
    
    All of the 3 metrics of task queue are migrated to the new framework, including
    the length of task queue, the accumulative number of delayed/rejected tasks by
    throttling before enqueue. All of these metrics are attached to the queue-level
    entity, which is also newly introduced in this PR.
    
    Furthermore, this PR solves some other problems:
    
    - The circular including of header files between `common/json_helper.h` and
      `utils/metrics.h` leads to compilation error, resolved by dropping a redundant
      header file `runtime/tool_api.h` in the including chains.
    - Undefined reference of `http_service::register_handler` while compiling on
       macOs, is resolved by linking `dsn_http` to static library of `dsn_utils`.
    - Github actions run out of disk space while building releases, resolved by removing
       directories and tarballs after packing server and tools to spare more space, which
       is also tracked by another issue:
           https://github.com/apache/incubator-pegasus/issues/1485
    - Add some hint messages for errors while getting entity pointers.
---
 .github/workflows/lint_and_test_cpp.yaml        | 16 +++--
 src/base/test/CMakeLists.txt                    |  8 +--
 src/common/fs_manager.cpp                       | 11 ++--
 src/common/fs_manager.h                         |  2 +
 src/common/serialization_helper/thrift_helper.h |  1 -
 src/meta/meta_backup_service.cpp                |  8 ++-
 src/meta/meta_backup_service.h                  |  1 +
 src/meta/table_metrics.cpp                      | 13 ++--
 src/replica/replica_base.h                      |  7 ++-
 src/runtime/task/task_queue.cpp                 | 80 ++++++++++++++++---------
 src/runtime/task/task_queue.h                   | 19 +++---
 src/utils/CMakeLists.txt                        |  1 +
 src/utils/metrics.h                             | 12 +++-
 13 files changed, 117 insertions(+), 62 deletions(-)

diff --git a/.github/workflows/lint_and_test_cpp.yaml b/.github/workflows/lint_and_test_cpp.yaml
index ff6134c0b..49a136a84 100644
--- a/.github/workflows/lint_and_test_cpp.yaml
+++ b/.github/workflows/lint_and_test_cpp.yaml
@@ -147,9 +147,13 @@ jobs:
           ./run.sh build --test --skip_thirdparty -j $(nproc) -t release
           ccache -s
       - name: Pack Server
-        run: ./run.sh pack_server
+        run: |
+          ./run.sh pack_server
+          rm -rf pegasus-server-*
       - name: Pack Tools
-        run: ./run.sh pack_tools
+        run: |
+          ./run.sh pack_tools
+          rm -rf pegasus-tools-*
       - name: Tar files
         run: |
           mv thirdparty/hadoop-bin ./
@@ -541,9 +545,13 @@ jobs:
           ./run.sh build --test --skip_thirdparty -j $(nproc) -t release --use_jemalloc
           ccache -s
       - name: Pack Server
-        run: ./run.sh pack_server -j
+        run: |
+          ./run.sh pack_server -j
+          rm -rf pegasus-server-*
       - name: Pack Tools
-        run: ./run.sh pack_tools -j
+        run:
+          ./run.sh pack_tools -j
+          rm -rf pegasus-tools-*
       - name: Tar files
         run: |
           mv thirdparty/hadoop-bin ./
diff --git a/src/base/test/CMakeLists.txt b/src/base/test/CMakeLists.txt
index 8af554bc5..6ceda0e5c 100644
--- a/src/base/test/CMakeLists.txt
+++ b/src/base/test/CMakeLists.txt
@@ -28,10 +28,10 @@ set(MY_PROJ_SRC "")
 set(MY_SRC_SEARCH_MODE "GLOB")
 
 set(MY_PROJ_LIBS
-        dsn_runtime
-        dsn_utils
-        pegasus_base
-        gtest)
+    dsn_runtime
+    dsn_utils
+    pegasus_base
+    gtest)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)
 
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 874678bef..79dc3ba64 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -93,7 +93,9 @@ metric_entity_ptr instantiate_disk_metric_entity(const std::string &tag,
 } // anonymous namespace
 
 disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std::string &data_dir)
-    : _disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)),
+    : _tag(tag),
+      _data_dir(data_dir),
+      _disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)),
       METRIC_VAR_INIT_disk(total_disk_capacity_mb),
       METRIC_VAR_INIT_disk(avail_disk_capacity_mb)
 {
@@ -102,9 +104,10 @@ disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std::
 const metric_entity_ptr &disk_capacity_metrics::disk_metric_entity() const
 {
     CHECK_NOTNULL(_disk_metric_entity,
-                  "disk metric entity should has been instantiated: "
-                  "uninitialized entity cannot be used to instantiate "
-                  "metric");
+                  "disk metric entity (tag={}, data_dir={}) should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _tag,
+                  _data_dir);
     return _disk_metric_entity;
 }
 
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index fb80e2a5e..772bdc005 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -55,6 +55,8 @@ public:
     METRIC_DEFINE_SET(avail_disk_capacity_mb, int64_t)
 
 private:
+    const std::string _tag;
+    const std::string _data_dir;
     const metric_entity_ptr _disk_metric_entity;
     METRIC_VAR_DECLARE_gauge_int64(total_disk_capacity_mb);
     METRIC_VAR_DECLARE_gauge_int64(avail_disk_capacity_mb);
diff --git a/src/common/serialization_helper/thrift_helper.h b/src/common/serialization_helper/thrift_helper.h
index 4afcc4cc2..70205d41e 100644
--- a/src/common/serialization_helper/thrift_helper.h
+++ b/src/common/serialization_helper/thrift_helper.h
@@ -26,7 +26,6 @@
 
 #pragma once
 
-#include "runtime/tool_api.h"
 #include "runtime/rpc/rpc_stream.h"
 
 #include <thrift/Thrift.h>
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index 832158161..a746c7bdb 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -77,7 +77,8 @@ metric_entity_ptr instantiate_backup_policy_metric_entity(const std::string &pol
 } // anonymous namespace
 
 backup_policy_metrics::backup_policy_metrics(const std::string &policy_name)
-    : _backup_policy_metric_entity(instantiate_backup_policy_metric_entity(policy_name)),
+    : _policy_name(policy_name),
+      _backup_policy_metric_entity(instantiate_backup_policy_metric_entity(policy_name)),
       METRIC_VAR_INIT_backup_policy(backup_recent_duration_ms)
 {
 }
@@ -85,8 +86,9 @@ backup_policy_metrics::backup_policy_metrics(const std::string &policy_name)
 const metric_entity_ptr &backup_policy_metrics::backup_policy_metric_entity() const
 {
     CHECK_NOTNULL(_backup_policy_metric_entity,
-                  "backup_policy metric entity should has been instantiated: "
-                  "uninitialized entity cannot be used to instantiate metric");
+                  "backup_policy metric entity (policy_name={}) should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _policy_name);
     return _backup_policy_metric_entity;
 }
 
diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h
index 14afbf7f4..1022478b1 100644
--- a/src/meta/meta_backup_service.h
+++ b/src/meta/meta_backup_service.h
@@ -187,6 +187,7 @@ public:
     METRIC_DEFINE_SET(backup_recent_duration_ms, int64_t)
 
 private:
+    const std::string _policy_name;
     const metric_entity_ptr _backup_policy_metric_entity;
     METRIC_VAR_DECLARE_gauge_int64(backup_recent_duration_ms);
 
diff --git a/src/meta/table_metrics.cpp b/src/meta/table_metrics.cpp
index c63dd1f09..73b45d69e 100644
--- a/src/meta/table_metrics.cpp
+++ b/src/meta/table_metrics.cpp
@@ -146,9 +146,10 @@ partition_metrics::partition_metrics(int32_t table_id, int32_t partition_id)
 const metric_entity_ptr &partition_metrics::partition_metric_entity() const
 {
     CHECK_NOTNULL(_partition_metric_entity,
-                  "partition metric entity should has been instantiated: "
-                  "uninitialized entity cannot be used to instantiate "
-                  "metric");
+                  "partition metric entity (table_id={}, partition_id={}) should has been "
+                  "instantiated: uninitialized entity cannot be used to instantiate metric",
+                  _table_id,
+                  _partition_id);
     return _partition_metric_entity;
 }
 
@@ -191,9 +192,9 @@ table_metrics::table_metrics(int32_t table_id, int32_t partition_count)
 const metric_entity_ptr &table_metrics::table_metric_entity() const
 {
     CHECK_NOTNULL(_table_metric_entity,
-                  "table metric entity should has been instantiated: "
-                  "uninitialized entity cannot be used to instantiate "
-                  "metric");
+                  "table metric entity (table_id={}) should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _table_id);
     return _table_metric_entity;
 }
 
diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h
index ccb39b914..51dc8f51e 100644
--- a/src/replica/replica_base.h
+++ b/src/replica/replica_base.h
@@ -57,9 +57,10 @@ struct replica_base
     const metric_entity_ptr &replica_metric_entity() const
     {
         CHECK_NOTNULL(_replica_metric_entity,
-                      "replica metric entity should has been instantiated: "
-                      "uninitialized entity cannot be used to instantiate "
-                      "metric");
+                      "replica metric entity (table_id={}, partition_id={}) should has been "
+                      "instantiated: uninitialized entity cannot be used to instantiate metric",
+                      _gpid.get_app_id(),
+                      _gpid.get_partition_index());
         return _replica_metric_entity;
     }
 
diff --git a/src/runtime/task/task_queue.cpp b/src/runtime/task/task_queue.cpp
index 452a36886..19de1954c 100644
--- a/src/runtime/task/task_queue.cpp
+++ b/src/runtime/task/task_queue.cpp
@@ -26,12 +26,10 @@
 
 #include "task_queue.h"
 
-#include <stdio.h>
-
+#include "fmt/core.h"
 #include "runtime/rpc/network.h"
 #include "runtime/rpc/rpc_engine.h"
 #include "runtime/rpc/rpc_message.h"
-#include "runtime/service_engine.h"
 #include "runtime/task/task.h"
 #include "runtime/task/task_spec.h"
 #include "task_engine.h"
@@ -39,35 +37,51 @@
 #include "utils/error_code.h"
 #include "utils/exp_delay.h"
 #include "utils/fmt_logging.h"
+#include "utils/string_view.h"
 #include "utils/threadpool_spec.h"
 
+METRIC_DEFINE_entity(queue);
+
+METRIC_DEFINE_gauge_int64(queue,
+                          queue_length,
+                          dsn::metric_unit::kTasks,
+                          "The length of task queue");
+
+METRIC_DEFINE_counter(queue,
+                      queue_delayed_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of delayed tasks by throttling before enqueue");
+
+METRIC_DEFINE_counter(queue,
+                      queue_rejected_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of rejeced tasks by throttling before enqueue");
+
 namespace dsn {
 
+namespace {
+
+metric_entity_ptr instantiate_queue_metric_entity(const std::string &queue_name)
+{
+    auto entity_id = fmt::format("queue_{}", queue_name);
+
+    return METRIC_ENTITY_queue.instantiate(entity_id, {{"queue_name", queue_name}});
+}
+
+} // anonymous namespace
+
 task_queue::task_queue(task_worker_pool *pool, int index, task_queue *inner_provider)
-    : _pool(pool), _queue_length(0)
+    : _pool(pool),
+      _name(fmt::format("{}.{}", pool->spec().name, index)),
+      _index(index),
+      _queue_length(0),
+      _spec(const_cast<threadpool_spec *>(&pool->spec())),
+      _virtual_queue_length(0),
+      _queue_metric_entity(instantiate_queue_metric_entity(_name)),
+      METRIC_VAR_INIT_queue(queue_length),
+      METRIC_VAR_INIT_queue(queue_delayed_tasks),
+      METRIC_VAR_INIT_queue(queue_rejected_tasks)
 {
-    char num[30];
-    sprintf(num, "%u", index);
-    _index = index;
-    _name = pool->spec().name + '.';
-    _name.append(num);
-    _queue_length_counter.init_global_counter(_pool->node()->full_name(),
-                                              "engine",
-                                              (_name + ".queue.length").c_str(),
-                                              COUNTER_TYPE_NUMBER,
-                                              "task queue length");
-    _delay_task_counter.init_global_counter(_pool->node()->full_name(),
-                                            "engine",
-                                            (_name + ".queue.delay_task").c_str(),
-                                            COUNTER_TYPE_VOLATILE_NUMBER,
-                                            "delay count of tasks before enqueue");
-    _reject_task_counter.init_global_counter(_pool->node()->full_name(),
-                                             "engine",
-                                             (_name + ".queue.reject_task").c_str(),
-                                             COUNTER_TYPE_VOLATILE_NUMBER,
-                                             "reject count of tasks before enqueue");
-    _virtual_queue_length = 0;
-    _spec = (threadpool_spec *)&pool->spec();
 }
 
 task_queue::~task_queue() = default;
@@ -94,7 +108,7 @@ void task_queue::enqueue_internal(task *task)
             if (delay_ms > 0) {
                 auto rtask = static_cast<rpc_request_task *>(task);
                 if (rtask->get_request()->io_session->delay_recv(delay_ms)) {
-                    _delay_task_counter->increment();
+                    METRIC_VAR_INCREMENT(queue_delayed_tasks);
                 }
             }
         } else {
@@ -104,7 +118,7 @@ void task_queue::enqueue_internal(task *task)
                 auto rtask = static_cast<rpc_request_task *>(task);
                 auto resp = rtask->get_request()->create_response();
                 task::get_current_rpc()->reply(resp, ERR_BUSY);
-                _reject_task_counter->increment();
+                METRIC_VAR_INCREMENT(queue_rejected_tasks);
                 task->release_ref(); // added in task::enqueue(pool)
                 return;
             }
@@ -114,4 +128,14 @@ void task_queue::enqueue_internal(task *task)
     tls_dsn.last_worker_queue_size = increase_count();
     enqueue(task);
 }
+
+const metric_entity_ptr &task_queue::queue_metric_entity() const
+{
+    CHECK_NOTNULL(_queue_metric_entity,
+                  "queue metric entity (queue_name={}) should has been instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _name);
+    return _queue_metric_entity;
+}
+
 } // namespace dsn
diff --git a/src/runtime/task/task_queue.h b/src/runtime/task/task_queue.h
index c06cd6c50..99036ab18 100644
--- a/src/runtime/task/task_queue.h
+++ b/src/runtime/task/task_queue.h
@@ -26,12 +26,11 @@
 
 #pragma once
 
-#include <stdint.h>
 #include <atomic>
 #include <string>
 
-#include "perf_counter/perf_counter.h"
-#include "perf_counter/perf_counter_wrapper.h"
+#include "utils/autoref_ptr.h"
+#include "utils/metrics.h"
 
 namespace dsn {
 
@@ -71,12 +70,12 @@ public:
     int count() const { return _queue_length.load(std::memory_order_relaxed); }
     int decrease_count(int count = 1)
     {
-        _queue_length_counter->add((int64_t)(-count));
+        METRIC_VAR_DECREMENT_BY(queue_length, count);
         return _queue_length.fetch_sub(count, std::memory_order_relaxed) - count;
     }
     int increase_count(int count = 1)
     {
-        _queue_length_counter->add(count);
+        METRIC_VAR_INCREMENT_BY(queue_length, count);
         return _queue_length.fetch_add(count, std::memory_order_relaxed) + count;
     }
     const std::string &get_name() { return _name; }
@@ -88,16 +87,20 @@ private:
     friend class task_worker_pool;
     void enqueue_internal(task *task);
 
+    const metric_entity_ptr &queue_metric_entity() const;
+
 private:
     task_worker_pool *_pool;
     std::string _name;
     int _index;
     std::atomic<int> _queue_length;
-    dsn::perf_counter_wrapper _queue_length_counter;
-    dsn::perf_counter_wrapper _delay_task_counter;
-    dsn::perf_counter_wrapper _reject_task_counter;
     threadpool_spec *_spec;
     volatile int _virtual_queue_length;
+
+    const metric_entity_ptr _queue_metric_entity;
+    METRIC_VAR_DECLARE_gauge_int64(queue_length);
+    METRIC_VAR_DECLARE_counter(queue_delayed_tasks);
+    METRIC_VAR_DECLARE_counter(queue_rejected_tasks);
 };
 /*@}*/
 } // namespace dsn
diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt
index e17fef900..2f03e75eb 100644
--- a/src/utils/CMakeLists.txt
+++ b/src/utils/CMakeLists.txt
@@ -38,6 +38,7 @@ set(MY_BINPLACES "")
 
 if (APPLE)
     dsn_add_static_library()
+    target_link_libraries(${MY_PROJ_NAME} PRIVATE dsn_http)
 else()
     dsn_add_shared_library()
 endif()
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 7a7d64bd0..789a5bac2 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -170,8 +170,9 @@ class error_code;
 #define METRIC_VAR_INIT_table(name, ...) METRIC_VAR_INIT(name, table, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__)
+#define METRIC_VAR_INIT_queue(name, ...) METRIC_VAR_INIT(name, queue, ##__VA_ARGS__)
 
-// Perform increment-related operations on gauges and counters.
+// Perform increment_by() operations on gauges and counters.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                                           \
     do {                                                                                           \
         const auto v = (x);                                                                        \
@@ -183,6 +184,15 @@ class error_code;
 // Perform increment() operations on gauges and counters.
 #define METRIC_VAR_INCREMENT(name) METRIC_VAR_NAME(name)->increment()
 
+// Perform decrement_by() operations on gauges.
+#define METRIC_VAR_DECREMENT_BY(name, x)                                                           \
+    do {                                                                                           \
+        const auto v = (x);                                                                        \
+        if (v != 0) {                                                                              \
+            METRIC_VAR_NAME(name)->decrement_by(v);                                                \
+        }                                                                                          \
+    } while (0)
+
 // Perform decrement() operations on gauges.
 #define METRIC_VAR_DECREMENT(name) METRIC_VAR_NAME(name)->decrement()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org