You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/08/08 03:23:16 UTC
[incubator-doris] branch master updated: [metrics] Redesign metrics
to 3 layers (#4115)
This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e711521 [metrics] Redesign metrics to 3 layers (#4115)
e711521 is described below
commit e71152132c460d3524a3b4b898b5fa10c5888523
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Sat Aug 8 11:23:01 2020 +0800
[metrics] Redesign metrics to 3 layers (#4115)
Redesign metrics to 3 layers:
MetricRegistry - MetricEntity - Metrics
MetricRegistry : the register center
MetricEntity : the entity registered on MetricRegistry. Generally a MetricRegistry can be registered on several
MetricEntities, each of MetricEntity is an independent entity, such as server, disk_devices, data_directories, thrift
clients and servers, and so on.
Metric : metrics of an entity. Such as fragment_requests_total on server entity, disk_bytes_read on a disk_device entity,
thrift_opened_clients on a thrift_client entity.
MetricPrototype: the type of a metric. MetricPrototype is a global variable, can be shared by the same metrics across
different MetricEntities.
---
be/src/agent/heartbeat_server.cpp | 1 -
be/src/agent/task_worker_pool.cpp | 9 -
be/src/common/daemon.cpp | 4 +-
be/src/exec/tablet_sink.cpp | 4 +-
be/src/http/action/metrics_action.cpp | 183 +---------
be/src/http/action/metrics_action.h | 4 +-
be/src/http/action/stream_load.cpp | 22 +-
be/src/http/action/stream_load.h | 6 +
be/src/olap/data_dir.cpp | 22 +-
be/src/olap/data_dir.h | 9 +
be/src/olap/rowset/unique_rowset_id_generator.cpp | 8 +-
be/src/olap/rowset/unique_rowset_id_generator.h | 2 +-
be/src/olap/storage_engine.cpp | 17 +-
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/broker_mgr.cpp | 5 +-
be/src/runtime/client_cache.cpp | 30 +-
be/src/runtime/client_cache.h | 17 +-
be/src/runtime/data_stream_mgr.cpp | 11 +-
be/src/runtime/data_stream_mgr.h | 1 +
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 12 +-
be/src/runtime/external_scan_context_mgr.cpp | 10 +-
be/src/runtime/external_scan_context_mgr.h | 5 +-
be/src/runtime/fragment_mgr.cpp | 5 +-
be/src/runtime/load_channel_mgr.cpp | 5 +-
be/src/runtime/memory/chunk_allocator.cpp | 55 ++-
be/src/runtime/memory/chunk_allocator.h | 3 +
be/src/runtime/result_buffer_mgr.cpp | 5 +-
be/src/runtime/result_queue_mgr.cpp | 5 +-
.../routine_load/routine_load_task_executor.cpp | 29 ++
.../routine_load/routine_load_task_executor.h | 28 +-
be/src/runtime/small_file_mgr.cpp | 5 +-
.../stream_load/load_stream_mgr.cpp} | 29 +-
be/src/runtime/stream_load/load_stream_mgr.h | 11 +-
be/src/runtime/tablets_channel.cpp | 4 +-
be/src/runtime/test_env.cc | 16 +-
be/src/runtime/test_env.h | 5 -
be/src/runtime/tmp_file_mgr.cc | 43 +--
be/src/runtime/tmp_file_mgr.h | 17 +-
be/src/service/backend_service.cpp | 1 -
be/src/service/http_service.cpp | 2 +-
be/src/util/CMakeLists.txt | 1 +
.../metrics_action.h => util/brpc_stub_cache.cpp} | 31 +-
be/src/util/brpc_stub_cache.h | 14 +-
be/src/util/doris_metrics.cpp | 370 +++++++++++----------
be/src/util/doris_metrics.h | 272 +++++++--------
be/src/util/metrics.cpp | 254 +++++++++-----
be/src/util/metrics.h | 362 +++++++-------------
be/src/util/system_metrics.cpp | 368 +++++++++++---------
be/src/util/system_metrics.h | 27 +-
be/src/util/thrift_server.cpp | 33 +-
be/src/util/thrift_server.h | 18 +-
be/test/http/metrics_action_test.cpp | 45 ++-
.../rowset/unique_rowset_id_generator_test.cpp | 61 ++--
be/test/runtime/tmp_file_mgr_test.cpp | 33 +-
be/test/util/doris_metrics_test.cpp | 121 ++-----
be/test/util/new_metrics_test.cpp | 334 +++++++++++--------
be/test/util/system_metrics_test.cpp | 164 +++------
58 files changed, 1476 insertions(+), 1686 deletions(-)
diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp
index 08915b5..6725c1e 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -185,7 +185,6 @@ AgentStatus create_heartbeat_server(
server_name,
server_processor,
server_port,
- exec_env->metrics(),
worker_thread_num);
return DORIS_SUCCESS;
}
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 7b97bf8..ac51aed 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1103,15 +1103,6 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
disk.__set_disk_available_capacity(static_cast<double>(root_path_info.available));
disk.__set_used(root_path_info.is_used);
disks[root_path_info.path] = disk;
-
- DorisMetrics::instance()->disks_total_capacity.set_metric(root_path_info.path,
- root_path_info.disk_capacity);
- DorisMetrics::instance()->disks_avail_capacity.set_metric(root_path_info.path,
- root_path_info.available);
- DorisMetrics::instance()->disks_data_used_capacity.set_metric(root_path_info.path,
- root_path_info.data_used_capacity);
- DorisMetrics::instance()->disks_state.set_metric(root_path_info.path,
- root_path_info.is_used ? 1L : 0L);
}
request.__set_disks(disks);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index d226053..f3516f3 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -133,7 +133,7 @@ void* calculate_metrics(void* dummy) {
std::map<std::string, int64_t> lst_net_receive_bytes;
while (true) {
- DorisMetrics::instance()->metrics()->trigger_hook();
+ DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);
if (last_ts == -1L) {
last_ts = GetCurrentTimeMicros() / 1000;
@@ -204,7 +204,7 @@ static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
}
}
DorisMetrics::instance()->initialize(
- paths, init_system_metrics, disk_devices, network_interfaces);
+ init_system_metrics, disk_devices, network_interfaces);
if (config::enable_metric_calculator) {
pthread_t calculator_pid;
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index b43829e..2845127 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -604,8 +604,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
state->update_num_bytes_load_total(input_batch->total_byte_size());
- DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows());
- DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size());
+ DorisMetrics::instance()->load_rows.increment(input_batch->num_rows());
+ DorisMetrics::instance()->load_bytes.increment(input_batch->total_byte_size());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
diff --git a/be/src/http/action/metrics_action.cpp b/be/src/http/action/metrics_action.cpp
index a84efea..2ffca2f 100644
--- a/be/src/http/action/metrics_action.cpp
+++ b/be/src/http/action/metrics_action.cpp
@@ -32,192 +32,15 @@
namespace doris {
-class PrometheusMetricsVisitor : public MetricsVisitor {
-public:
- virtual ~PrometheusMetricsVisitor() {}
- void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) override;
- std::string to_string() const { return _ss.str(); }
-private:
- void _visit_simple_metric(
- const std::string& name, const MetricLabels& labels, Metric* metric);
-private:
- std::stringstream _ss;
-};
-
-// eg:
-// palo_be_process_fd_num_used LONG 43
-// palo_be_process_thread_num LONG 240
-class SimpleCoreMetricsVisitor : public MetricsVisitor {
-public:
- virtual ~SimpleCoreMetricsVisitor() {}
- void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) override;
- std::string to_string() const { return _ss.str(); }
-
-private:
- std::stringstream _ss;
- static const std::string PROCESS_FD_NUM_USED;
- static const std::string PROCESS_THREAD_NUM;
- static const std::string PUSH_REQUEST_WRITE_BYTES_PER_SECOND;
- static const std::string QUERY_SCAN_BYTES_PER_SECOND;
- static const std::string MAX_DISK_IO_UTIL_PERCENT;
- static const std::string MAX_NETWORK_SEND_BYTES_RATE;
- static const std::string MAX_NETWORK_RECEIVE_BYTES_RATE;
-};
-
-const std::string SimpleCoreMetricsVisitor::PROCESS_FD_NUM_USED = "process_fd_num_used";
-const std::string SimpleCoreMetricsVisitor::PROCESS_THREAD_NUM = "process_thread_num";
-const std::string SimpleCoreMetricsVisitor::PUSH_REQUEST_WRITE_BYTES_PER_SECOND = "push_request_write_bytes_per_second";
-const std::string SimpleCoreMetricsVisitor::QUERY_SCAN_BYTES_PER_SECOND = "query_scan_bytes_per_second";
-const std::string SimpleCoreMetricsVisitor::MAX_DISK_IO_UTIL_PERCENT = "max_disk_io_util_percent";
-const std::string SimpleCoreMetricsVisitor::MAX_NETWORK_SEND_BYTES_RATE= "max_network_send_bytes_rate";
-const std::string SimpleCoreMetricsVisitor::MAX_NETWORK_RECEIVE_BYTES_RATE= "max_network_receive_bytes_rate";
-
-void PrometheusMetricsVisitor::visit(const std::string& prefix,
- const std::string& name,
- MetricCollector* collector) {
- if (collector->empty() || name.empty()) {
- return;
- }
- std::string metric_name;
- if (prefix.empty()) {
- metric_name = name;
- } else {
- metric_name = prefix + "_" + name;
- }
- // Output metric type
- _ss << "# TYPE " << metric_name << " " << collector->type() << "\n";
- switch (collector->type()) {
- case MetricType::COUNTER:
- case MetricType::GAUGE:
- for (auto& it : collector->metrics()) {
- _visit_simple_metric(metric_name, it.first, (Metric*) it.second);
- }
- break;
- default:
- break;
- }
-}
-
-void PrometheusMetricsVisitor::_visit_simple_metric(
- const std::string& name, const MetricLabels& labels, Metric* metric) {
- _ss << name;
- // labels
- if (!labels.empty()) {
- _ss << "{";
- int i = 0;
- for (auto& label : labels.labels) {
- if (i++ > 0) {
- _ss << ",";
- }
- _ss << label.name << "=\"" << label.value << "\"";
- }
- _ss << "}";
- }
- _ss << " " << metric->to_string() << "\n";
-}
-
-void SimpleCoreMetricsVisitor::visit(const std::string& prefix,
- const std::string& name,
- MetricCollector* collector) {
- if (collector->empty() || name.empty()) {
- return;
- }
-
- if (name != PROCESS_FD_NUM_USED && name != PROCESS_THREAD_NUM
- && name != PUSH_REQUEST_WRITE_BYTES_PER_SECOND
- && name != QUERY_SCAN_BYTES_PER_SECOND
- && name != MAX_DISK_IO_UTIL_PERCENT
- && name != MAX_NETWORK_SEND_BYTES_RATE
- && name != MAX_NETWORK_RECEIVE_BYTES_RATE) {
- return;
- }
-
- std::string metric_name;
- if (prefix.empty()) {
- metric_name = name;
- } else {
- metric_name = prefix + "_" + name;
- }
-
- for (auto& it : collector->metrics()) {
- _ss << metric_name << " LONG " << ((Metric*) it.second)->to_string()
- << "\n";
- }
-}
-
-class JsonMetricsVisitor : public MetricsVisitor {
-public:
- JsonMetricsVisitor() {
- }
- virtual ~JsonMetricsVisitor() {}
- void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) override;
- std::string to_string() {
- rapidjson::StringBuffer strBuf;
- rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
- doc.Accept(writer);
- return strBuf.GetString();
- }
-
-private:
- rapidjson::Document doc{rapidjson::kArrayType};
-};
-
-void JsonMetricsVisitor::visit(const std::string& prefix,
- const std::string& name,
- MetricCollector* collector) {
- if (collector->empty() || name.empty()) {
- return;
- }
-
- rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
- switch (collector->type()) {
- case MetricType::COUNTER:
- case MetricType::GAUGE:
- for (auto& it : collector->metrics()) {
- const MetricLabels& labels = it.first;
- Metric* metric = reinterpret_cast<Metric*>(it.second);
- rapidjson::Value metric_obj(rapidjson::kObjectType);
- rapidjson::Value tag_obj(rapidjson::kObjectType);
- tag_obj.AddMember("metric", rapidjson::Value(name.c_str(), allocator), allocator);
- // labels
- if (!labels.empty()) {
- for (auto& label : labels.labels) {
- tag_obj.AddMember(
- rapidjson::Value(label.name.c_str(), allocator),
- rapidjson::Value(label.value.c_str(), allocator),
- allocator);
- }
- }
- metric_obj.AddMember("tags", tag_obj, allocator);
- rapidjson::Value unit_val(unit_name(metric->unit()), allocator);
- metric_obj.AddMember("unit", unit_val, allocator);
- metric->write_value(metric_obj, allocator);
- doc.PushBack(metric_obj, allocator);
- }
- break;
- default:
- break;
- }
-}
-
void MetricsAction::handle(HttpRequest* req) {
const std::string& type = req->param("type");
std::string str;
if (type == "core") {
- SimpleCoreMetricsVisitor visitor;
- _metrics->collect(&visitor);
- str.assign(visitor.to_string());
+ str = _metric_registry->to_core_string();
} else if (type == "json") {
- JsonMetricsVisitor visitor;
- _metrics->collect(&visitor);
- str.assign(visitor.to_string());
+ str = _metric_registry->to_json();
} else {
- PrometheusMetricsVisitor visitor;
- _metrics->collect(&visitor);
- str.assign(visitor.to_string());
+ str = _metric_registry->to_prometheus();
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
diff --git a/be/src/http/action/metrics_action.h b/be/src/http/action/metrics_action.h
index 1707d44..3cbc2f0 100644
--- a/be/src/http/action/metrics_action.h
+++ b/be/src/http/action/metrics_action.h
@@ -28,12 +28,12 @@ class MetricRegistry;
class MetricsAction : public HttpHandler {
public:
- MetricsAction(MetricRegistry* metrics) :_metrics(metrics) { }
+ MetricsAction(MetricRegistry* metric_registry) :_metric_registry(metric_registry) { }
virtual ~MetricsAction() { }
void handle(HttpRequest *req) override;
private:
- MetricRegistry* _metrics;
+ MetricRegistry* _metric_registry;
};
}
diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 294ea16..f807972 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -59,10 +59,10 @@
namespace doris {
-METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::REQUESTS);
-METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES);
-METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
-METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
@@ -88,17 +88,15 @@ static bool is_format_support_streaming(TFileFormatType::type format) {
}
StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
- DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total",
- &streaming_load_requests_total);
- DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes",
- &streaming_load_bytes);
- DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms",
- &streaming_load_duration_ms);
- DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing",
- &streaming_load_current_processing);
+ _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load", {});
+ METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total);
+ METRIC_REGISTER(_stream_load_entity, streaming_load_bytes);
+ METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms);
+ METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing);
}
StreamLoadAction::~StreamLoadAction() {
+ DorisMetrics::instance()->metric_registry()->deregister_entity("stream_load");
}
void StreamLoadAction::handle(HttpRequest* req) {
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index 34ee2b6..1555981 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -53,6 +53,12 @@ private:
private:
ExecEnv* _exec_env;
+
+ MetricEntity* _stream_load_entity;
+ IntCounter streaming_load_requests_total;
+ IntCounter streaming_load_bytes;
+ IntCounter streaming_load_duration_ms;
+ IntGauge streaming_load_current_processing;
};
}
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index bd7dd6a..1ecd24e 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -55,6 +55,11 @@ using strings::Substitute;
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_total_capacity, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_avail_capacity, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_data_used_capacity, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(disks_state, MetricUnit::BYTES);
+
static const char* const kMtabPath = "/etc/mtab";
static const char* const kTestFilePath = "/.testfile";
@@ -72,9 +77,16 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
_cluster_id(-1),
_to_be_deleted(false),
_current_shard(0),
- _meta(nullptr) {}
+ _meta(nullptr) {
+ _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("data_dir.") + path, {{"path", path}});
+ METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity);
+ METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity);
+ METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity);
+ METRIC_REGISTER(_data_dir_metric_entity, disks_state);
+}
DataDir::~DataDir() {
+ DorisMetrics::instance()->metric_registry()->deregister_entity(std::string("data_dir.") + _path);
delete _id_generator;
delete _meta;
}
@@ -308,6 +320,7 @@ void DataDir::health_check() {
}
}
}
+ disks_state.set_value(_is_used ? 1 : 0);
}
OLAPStatus DataDir::_read_and_write_test_file() {
@@ -890,12 +903,19 @@ Status DataDir::update_capacity() {
Substitute("get path $0 available capacity failed, error=$1", _path, e.what())),
"boost::filesystem::space failed");
}
+
+ disks_total_capacity.set_value(_disk_capacity_bytes);
+ disks_avail_capacity.set_value(_available_bytes);
LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
<< ", available capacity: " << _available_bytes;
return Status::OK();
}
+void DataDir::update_user_data_size(int64_t size) {
+ disks_data_used_capacity.set_value(size);
+}
+
bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
double used_pct = (_disk_capacity_bytes - _available_bytes + incoming_data_size) /
(double)_disk_capacity_bytes;
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 58992ed..e700832 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -28,6 +28,7 @@
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_id_generator.h"
+#include "util/metrics.h"
#include "util/mutex.h"
namespace doris {
@@ -121,6 +122,8 @@ public:
Status update_capacity();
+ void update_user_data_size(int64_t size);
+
private:
std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
@@ -185,6 +188,12 @@ private:
// used in convert process
bool _convert_old_data_success;
+
+ MetricEntity* _data_dir_metric_entity;
+ IntGauge disks_total_capacity;
+ IntGauge disks_avail_capacity;
+ IntGauge disks_data_used_capacity;
+ IntGauge disks_state;
};
} // namespace doris
diff --git a/be/src/olap/rowset/unique_rowset_id_generator.cpp b/be/src/olap/rowset/unique_rowset_id_generator.cpp
index 30a26f6..fca648d 100644
--- a/be/src/olap/rowset/unique_rowset_id_generator.cpp
+++ b/be/src/olap/rowset/unique_rowset_id_generator.cpp
@@ -23,14 +23,20 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(rowset_count_generated_and_in_use, MetricUnit::ROWSETS);
+
UniqueRowsetIdGenerator::UniqueRowsetIdGenerator(const UniqueId& backend_uid)
: _backend_uid(backend_uid), _inc_id(0) {
- REGISTER_GAUGE_DORIS_METRIC(rowset_count_generated_and_in_use, [this]() {
+ REGISTER_HOOK_METRIC(rowset_count_generated_and_in_use, [this]() {
std::lock_guard<SpinLock> l(_lock);
return _valid_rowset_id_hi.size();
});
}
+UniqueRowsetIdGenerator::~UniqueRowsetIdGenerator() {
+ DEREGISTER_HOOK_METRIC(rowset_count_generated_and_in_use);
+}
+
// generate a unique rowset id and save it in a set to check whether it is valid in the future
RowsetId UniqueRowsetIdGenerator::next_id() {
RowsetId rowset_id;
diff --git a/be/src/olap/rowset/unique_rowset_id_generator.h b/be/src/olap/rowset/unique_rowset_id_generator.h
index 4aee87b..0793af0 100644
--- a/be/src/olap/rowset/unique_rowset_id_generator.h
+++ b/be/src/olap/rowset/unique_rowset_id_generator.h
@@ -26,7 +26,7 @@ namespace doris {
class UniqueRowsetIdGenerator : public RowsetIdGenerator {
public:
UniqueRowsetIdGenerator(const UniqueId& backend_uid);
- ~UniqueRowsetIdGenerator() {}
+ ~UniqueRowsetIdGenerator();
RowsetId next_id() override;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3472e9e..b65652b 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -83,6 +83,9 @@ using strings::Substitute;
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(unused_rowsets_count, MetricUnit::ROWSETS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(compaction_mem_current_consumption, MetricUnit::BYTES);
+
StorageEngine* StorageEngine::_s_instance = nullptr;
static Status _validate_options(const EngineOptions& options) {
@@ -120,11 +123,11 @@ StorageEngine::StorageEngine(const EngineOptions& options)
if (_s_instance == nullptr) {
_s_instance = this;
}
- REGISTER_GAUGE_DORIS_METRIC(unused_rowsets_count, [this]() {
+ REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
MutexLock lock(&_gc_mutex);
return _unused_rowsets.size();
});
- REGISTER_GAUGE_DORIS_METRIC(compaction_mem_current_consumption, [this]() {
+ REGISTER_HOOK_METRIC(compaction_mem_current_consumption, [this]() {
return _compaction_mem_tracker->consumption();
// We can get each compaction's detail usage
// LOG(INFO) << _compaction_mem_tracker=>LogUsage(2);
@@ -132,6 +135,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
}
StorageEngine::~StorageEngine() {
+ DEREGISTER_HOOK_METRIC(unused_rowsets_count);
+ DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption);
_clear();
}
@@ -314,6 +319,14 @@ OLAPStatus StorageEngine::get_all_data_dir_info(vector<DataDirInfo>* data_dir_in
size_t tablet_count = 0;
_tablet_manager->update_root_path_info(&path_map, &tablet_count);
+ // 3. update metrics in DataDir
+ for (auto& path : path_map) {
+ std::lock_guard<std::mutex> l(_store_lock);
+ auto data_dir = _store_map.find(path.first);
+ DCHECK(data_dir != _store_map.end());
+ data_dir->second->update_user_data_size(path.second.data_used_capacity);
+ }
+
// add path info to data_dir_infos
for (auto& entry : path_map) {
data_dir_infos->emplace_back(entry.second);
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 1df18a3..e2ea260 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -92,6 +92,7 @@ set(RUNTIME_FILES
message_body_sink.cpp
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
+ stream_load/load_stream_mgr.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
diff --git a/be/src/runtime/broker_mgr.cpp b/be/src/runtime/broker_mgr.cpp
index 35657e1..8dead28 100644
--- a/be/src/runtime/broker_mgr.cpp
+++ b/be/src/runtime/broker_mgr.cpp
@@ -30,15 +30,18 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(broker_count, MetricUnit::NOUNIT);
+
BrokerMgr::BrokerMgr(ExecEnv* exec_env) :
_exec_env(exec_env), _thread_stop(false), _ping_thread(&BrokerMgr::ping_worker, this) {
- REGISTER_GAUGE_DORIS_METRIC(broker_count, [this]() {
+ REGISTER_HOOK_METRIC(broker_count, [this]() {
std::lock_guard<std::mutex> l(_mutex);
return _broker_set.size();
});
}
BrokerMgr::~BrokerMgr() {
+ DEREGISTER_HOOK_METRIC(broker_count);
_thread_stop = true;
_ping_thread.join();
}
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index 832678d..cc5bc75 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -28,12 +28,16 @@
#include "common/logging.h"
#include "util/container_util.hpp"
+#include "util/doris_metrics.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "gen_cpp/FrontendService.h"
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_used_clients, MetricUnit::NOUNIT, "Number of clients 'checked-out' from the cache");
+DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_opened_clients, MetricUnit::NOUNIT, "Total clients in the cache, including those in use");
+
ClientCacheHelper::~ClientCacheHelper() {
for (auto& it : _client_map) {
delete it.second;
@@ -67,7 +71,7 @@ Status ClientCacheHelper::get_client(
_client_map[*client_key]->set_recv_timeout(timeout_ms);
if (_metrics_enabled) {
- _used_clients->increment(1);
+ thrift_used_clients.increment(1);
}
return Status::OK();
@@ -92,7 +96,7 @@ Status ClientCacheHelper::reopen_client(client_factory factory_method, void** cl
*client_key = NULL;
if (_metrics_enabled) {
- _opened_clients->increment(-1);
+ thrift_opened_clients.increment(-1);
}
RETURN_IF_ERROR(create_client(make_network_address(
@@ -123,7 +127,7 @@ Status ClientCacheHelper::create_client(
_client_map[*client_key] = client_impl.release();
if (_metrics_enabled) {
- _opened_clients->increment(1);
+ thrift_opened_clients.increment(1);
}
return Status::OK();
@@ -145,14 +149,14 @@ void ClientCacheHelper::release_client(void** client_key) {
delete info;
if (_metrics_enabled) {
- _opened_clients->increment(-1);
+ thrift_opened_clients.increment(-1);
}
} else {
j->second.push_back(*client_key);
}
if (_metrics_enabled) {
- _used_clients->increment(-1);
+ thrift_used_clients.increment(-1);
}
*client_key = NULL;
@@ -210,21 +214,17 @@ void ClientCacheHelper::test_shutdown() {
}
}
-void ClientCacheHelper::init_metrics(MetricRegistry* metrics, const std::string& key_prefix) {
- DCHECK(metrics != NULL);
+void ClientCacheHelper::init_metrics(const std::string& name) {
// Not strictly needed if init_metrics is called before any cache
// usage, but ensures that _metrics_enabled is published.
boost::lock_guard<boost::mutex> lock(_lock);
- _used_clients.reset(new IntGauge(MetricUnit::NOUNIT));
- metrics->register_metric("thrift_used_clients",
- MetricLabels().add("name", key_prefix),
- _used_clients.get());
+ _thrift_client_metric_entity =
+ DorisMetrics::instance()->metric_registry()->register_entity(
+ std::string("thrift_client.") + name, {{"name", name}});
+ METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients);
+ METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients);
- _opened_clients.reset(new IntGauge(MetricUnit::NOUNIT));
- metrics->register_metric("thrift_opened_clients",
- MetricLabels().add("name", key_prefix),
- _opened_clients.get());
_metrics_enabled = true;
}
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index 90caeb6..1a57c2c 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -84,7 +84,7 @@ public:
void test_shutdown();
- void init_metrics(MetricRegistry* metrics, const std::string& key_prefix);
+ void init_metrics(const std::string& name);
private:
template <class T> friend class ClientCache;
@@ -109,17 +109,18 @@ private:
typedef boost::unordered_map<void*, ThriftClientImpl*> ClientMap;
ClientMap _client_map;
- // MetricRegistry
bool _metrics_enabled;
// max connections per host in this cache, -1 means unlimited
int _max_cache_size_per_host;
+ MetricEntity* _thrift_client_metric_entity;
+
// Number of clients 'checked-out' from the cache
- std::unique_ptr<IntGauge> _used_clients;
+ IntGauge thrift_used_clients;
// Total clients in the cache, including those in use
- std::unique_ptr<IntGauge> _opened_clients;
+ IntGauge thrift_opened_clients;
// Create a new client for specific host/port in 'client' and put it in _client_map
Status create_client(const TNetworkAddress& hostport, client_factory factory_method,
@@ -226,12 +227,12 @@ public:
return _client_cache_helper.test_shutdown();
}
- // Adds metrics for this cache to the supplied MetricRegistry instance. The
- // metrics have keys that are prefixed by the key_prefix argument
+ // Adds metrics for this cache.
+ // The metrics have an identification by the 'name' argument
// (which should not end in a period).
// Must be called before the cache is used, otherwise the metrics might be wrong
- void init_metrics(MetricRegistry* metrics, const std::string& key_prefix) {
- _client_cache_helper.init_metrics(metrics, key_prefix);
+ void init_metrics(const std::string& name) {
+ _client_cache_helper.init_metrics(name);
}
private:
diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp
index 5fbc65a..3152ded 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -35,6 +35,9 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(data_stream_receiver_count, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(fragment_endpoint_count, MetricUnit::NOUNIT);
+
using boost::mutex;
using boost::shared_ptr;
using boost::unique_lock;
@@ -42,16 +45,20 @@ using boost::try_mutex;
using boost::lock_guard;
DataStreamMgr::DataStreamMgr() {
- REGISTER_GAUGE_DORIS_METRIC(data_stream_receiver_count, [this]() {
+ REGISTER_HOOK_METRIC(data_stream_receiver_count, [this]() {
lock_guard<mutex> l(_lock);
return _receiver_map.size();
});
- REGISTER_GAUGE_DORIS_METRIC(fragment_endpoint_count, [this]() {
+ REGISTER_HOOK_METRIC(fragment_endpoint_count, [this]() {
lock_guard<mutex> l(_lock);
return _fragment_stream_set.size();
});
}
+DataStreamMgr::~DataStreamMgr() {
+ DEREGISTER_HOOK_METRIC(data_stream_receiver_count);
+ DEREGISTER_HOOK_METRIC(fragment_endpoint_count);
+}
inline uint32_t DataStreamMgr::get_hash_value(
const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
uint32_t value = RawValue::get_hash_value(&fragment_instance_id.lo, TYPE_BIGINT, 0);
diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h
index a634b15..dbe0904 100644
--- a/be/src/runtime/data_stream_mgr.h
+++ b/be/src/runtime/data_stream_mgr.h
@@ -67,6 +67,7 @@ class PUniqueId;
class DataStreamMgr {
public:
DataStreamMgr();
+ ~DataStreamMgr();
// Create a receiver for a specific fragment_instance_id/node_id destination;
// If is_merging is true, the receiver maintains a separate queue of incoming row
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f13dc3b..a880419 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -37,7 +37,6 @@ class FragmentMgr;
class LoadPathMgr;
class LoadStreamMgr;
class MemTracker;
-class MetricRegistry;
class StorageEngine;
class PoolMemTrackerRegistry;
class PriorityThreadPool;
@@ -91,7 +90,6 @@ public:
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
- MetricRegistry* metrics() const { return _metrics; }
DataStreamMgr* stream_mgr() { return _stream_mgr; }
ResultBufferMgr* result_mgr() { return _result_mgr; }
ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; }
@@ -152,7 +150,6 @@ private:
std::vector<StorePath> _store_paths;
// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
- MetricRegistry* _metrics = nullptr;
DataStreamMgr* _stream_mgr = nullptr;
ResultBufferMgr* _result_mgr = nullptr;
ResultQueueMgr* _result_queue_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 7325a8a..a429cc6 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -72,7 +72,6 @@ Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_store_paths = store_paths;
_external_scan_context_mgr = new ExternalScanContextMgr(this);
- _metrics = DorisMetrics::instance()->metrics();
_stream_mgr = new DataStreamMgr();
_result_mgr = new ResultBufferMgr();
_result_queue_mgr = new ResultQueueMgr();
@@ -105,10 +104,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_plugin_mgr = new PluginMgr();
- _backend_client_cache->init_metrics(DorisMetrics::instance()->metrics(), "backend");
- _frontend_client_cache->init_metrics(DorisMetrics::instance()->metrics(), "frontend");
- _broker_client_cache->init_metrics(DorisMetrics::instance()->metrics(), "broker");
- _extdatasource_client_cache->init_metrics(DorisMetrics::instance()->metrics(), "extdatasource");
+ _backend_client_cache->init_metrics("backend");
+ _frontend_client_cache->init_metrics("frontend");
+ _broker_client_cache->init_metrics("broker");
+ _extdatasource_client_cache->init_metrics("extdatasource");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
_etl_job_mgr->init();
@@ -182,7 +181,7 @@ Status ExecEnv::_init_mem_tracker() {
LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker));
- RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::instance()->metrics()));
+ RETURN_IF_ERROR(_tmp_file_mgr->init());
int64_t storage_cache_limit = ParseUtil::parse_mem_spec(
config::storage_page_cache_limit, &is_percent);
@@ -235,7 +234,6 @@ void ExecEnv::_destory() {
delete _routine_load_task_executor;
delete _external_scan_context_mgr;
delete _heartbeat_flags;
- _metrics = nullptr;
}
void ExecEnv::destroy(ExecEnv* env) {
diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp
index 500dc0c..03e1fee 100644
--- a/be/src/runtime/external_scan_context_mgr.cpp
+++ b/be/src/runtime/external_scan_context_mgr.cpp
@@ -28,17 +28,25 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(active_scan_context_count, MetricUnit::NOUNIT);
+
ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(exec_env), _is_stop(false) {
// start the reaper thread for gc the expired context
_keep_alive_reaper.reset(
new std::thread(
std::bind<void>(std::mem_fn(&ExternalScanContextMgr::gc_expired_context), this)));
- REGISTER_GAUGE_DORIS_METRIC(active_scan_context_count, [this]() {
+ REGISTER_HOOK_METRIC(active_scan_context_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
return _active_contexts.size();
});
}
+ExternalScanContextMgr::~ExternalScanContextMgr() {
+ DEREGISTER_HOOK_METRIC(active_scan_context_count);
+ _is_stop = true;
+ _keep_alive_reaper->join();
+}
+
Status ExternalScanContextMgr::create_scan_context(std::shared_ptr<ScanContext>* p_context) {
std::string context_id = generate_uuid_string();
std::shared_ptr<ScanContext> context(new ScanContext(context_id));
diff --git a/be/src/runtime/external_scan_context_mgr.h b/be/src/runtime/external_scan_context_mgr.h
index c26ba71..6aa2893 100644
--- a/be/src/runtime/external_scan_context_mgr.h
+++ b/be/src/runtime/external_scan_context_mgr.h
@@ -50,10 +50,7 @@ public:
ExternalScanContextMgr(ExecEnv* exec_env);
- ~ExternalScanContextMgr() {
- _is_stop = true;
- _keep_alive_reaper->join();
- }
+ ~ExternalScanContextMgr();
Status create_scan_context(std::shared_ptr<ScanContext>* p_context);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9413f82..c3bf83b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -52,6 +52,8 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(plan_fragment_count, MetricUnit::NOUNIT);
+
std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
return "";
@@ -380,7 +382,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
_fragment_map(),
_stop(false),
_cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)) {
- REGISTER_GAUGE_DORIS_METRIC(plan_fragment_count, [this]() {
+ REGISTER_HOOK_METRIC(plan_fragment_count, [this]() {
std::lock_guard<std::mutex> lock(_lock);
return _fragment_map.size();
});
@@ -394,6 +396,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
}
FragmentMgr::~FragmentMgr() {
+ DEREGISTER_HOOK_METRIC(plan_fragment_count);
// stop thread
_stop = true;
_cancel_thread.join();
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index f0237ad..b02cb14 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -27,6 +27,8 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_count, MetricUnit::NOUNIT);
+
// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
if (process_mem_limit == -1) {
@@ -62,7 +64,7 @@ static int64_t calc_job_timeout_s(int64_t timeout_in_req_s) {
}
LoadChannelMgr::LoadChannelMgr() : _is_stopped(false) {
- REGISTER_GAUGE_DORIS_METRIC(load_channel_count, [this]() {
+ REGISTER_HOOK_METRIC(load_channel_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
return _load_channels.size();
});
@@ -70,6 +72,7 @@ LoadChannelMgr::LoadChannelMgr() : _is_stopped(false) {
}
LoadChannelMgr::~LoadChannelMgr() {
+ DEREGISTER_HOOK_METRIC(load_channel_count);
_is_stopped.store(true);
if (_load_channels_clean_thread.joinable()) {
_load_channels_clean_thread.join();
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 1ccbc5c..5fbc431 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -34,12 +34,19 @@ namespace doris {
ChunkAllocator* ChunkAllocator::_s_instance = nullptr;
-static IntCounter local_core_alloc_count(MetricUnit::NOUNIT);
-static IntCounter other_core_alloc_count(MetricUnit::NOUNIT);
-static IntCounter system_alloc_count(MetricUnit::NOUNIT);
-static IntCounter system_free_count(MetricUnit::NOUNIT);
-static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS);
-static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_local_core_alloc_count, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_other_core_alloc_count, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_count, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_count, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_cost_ns, MetricUnit::NANOSECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_cost_ns, MetricUnit::NANOSECONDS);
+
+static IntCounter chunk_pool_local_core_alloc_count;
+static IntCounter chunk_pool_other_core_alloc_count;
+static IntCounter chunk_pool_system_alloc_count;
+static IntCounter chunk_pool_system_free_count;
+static IntCounter chunk_pool_system_alloc_cost_ns;
+static IntCounter chunk_pool_system_free_cost_ns;
#ifdef BE_TEST
static std::mutex s_mutex;
@@ -101,22 +108,6 @@ private:
void ChunkAllocator::init_instance(size_t reserve_limit) {
if (_s_instance != nullptr) return;
_s_instance = new ChunkAllocator(reserve_limit);
-
-#define REGISTER_METIRC_WITH_NAME(name, metric) \
- DorisMetrics::instance()->metrics()->register_metric(#name, &metric)
-
-#define REGISTER_METIRC_WITH_PREFIX(prefix, name) \
- REGISTER_METIRC_WITH_NAME(prefix##name, name)
-
-#define REGISTER_METIRC(name) \
- REGISTER_METIRC_WITH_PREFIX(chunk_pool_, name)
-
- REGISTER_METIRC(local_core_alloc_count);
- REGISTER_METIRC(other_core_alloc_count);
- REGISTER_METIRC(system_alloc_count);
- REGISTER_METIRC(system_free_count);
- REGISTER_METIRC(system_alloc_cost_ns);
- REGISTER_METIRC(system_free_cost_ns);
}
ChunkAllocator::ChunkAllocator(size_t reserve_limit)
@@ -126,6 +117,14 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
for (int i = 0; i < _arenas.size(); ++i) {
_arenas[i].reset(new ChunkArena());
}
+
+ _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator", {});
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_local_core_alloc_count);
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_other_core_alloc_count);
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_count);
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count);
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns);
+ METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns);
}
bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
@@ -136,7 +135,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
if (_arenas[core_id]->pop_free_chunk(size, &chunk->data)) {
_reserved_bytes.fetch_sub(size);
- local_core_alloc_count.increment(1);
+ chunk_pool_local_core_alloc_count.increment(1);
return true;
}
if (_reserved_bytes > size) {
@@ -145,7 +144,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
for (int i = 1; i < _arenas.size(); ++i, ++core_id) {
if (_arenas[core_id % _arenas.size()]->pop_free_chunk(size, &chunk->data)) {
_reserved_bytes.fetch_sub(size);
- other_core_alloc_count.increment(1);
+ chunk_pool_other_core_alloc_count.increment(1);
// reset chunk's core_id to other
chunk->core_id = core_id % _arenas.size();
return true;
@@ -159,8 +158,8 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
// allocate from system allocator
chunk->data = SystemAllocator::allocate(size);
}
- system_alloc_count.increment(1);
- system_alloc_cost_ns.increment(cost_ns);
+ chunk_pool_system_alloc_count.increment(1);
+ chunk_pool_system_alloc_cost_ns.increment(cost_ns);
if (chunk->data == nullptr) {
return false;
}
@@ -178,8 +177,8 @@ void ChunkAllocator::free(const Chunk& chunk) {
SCOPED_RAW_TIMER(&cost_ns);
SystemAllocator::free(chunk.data, chunk.size);
}
- system_free_count.increment(1);
- system_free_cost_ns.increment(cost_ns);
+ chunk_pool_system_free_count.increment(1);
+ chunk_pool_system_free_cost_ns.increment(cost_ns);
return;
}
diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h
index 124e735..b4bb7fa 100644
--- a/be/src/runtime/memory/chunk_allocator.h
+++ b/be/src/runtime/memory/chunk_allocator.h
@@ -27,6 +27,7 @@ namespace doris {
class Chunk;
class ChunkArena;
+class MetricEntity;
// Used to allocate memory with power-of-two length.
// This Allocator allocate memory from system and cache free chunks for
@@ -75,6 +76,8 @@ private:
std::atomic<int64_t> _reserved_bytes;
// each core has a ChunkArena
std::vector<std::unique_ptr<ChunkArena>> _arenas;
+
+ MetricEntity* _chunk_allocator_metric_entity;
};
}
diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp
index ea30cc4..35f00b5 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -26,6 +26,8 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(result_buffer_block_count, MetricUnit::NOUNIT);
+
//std::size_t hash_value(const TUniqueId& fragment_id) {
// uint32_t value = RawValue::get_hash_value(&fragment_id.lo, TypeDescriptor(TYPE_BIGINT), 0);
// value = RawValue::get_hash_value(&fragment_id.hi, TypeDescriptor(TYPE_BIGINT), value);
@@ -36,13 +38,14 @@ ResultBufferMgr::ResultBufferMgr()
: _is_stop(false) {
// Each BufferControlBlock has a limited queue size of 1024, it's not needed to count the
// actual size of all BufferControlBlock.
- REGISTER_GAUGE_DORIS_METRIC(result_buffer_block_count, [this]() {
+ REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
boost::lock_guard<boost::mutex> l(_lock);
return _buffer_map.size();
});
}
ResultBufferMgr::~ResultBufferMgr() {
+ DEREGISTER_HOOK_METRIC(result_buffer_block_count);
_is_stop = true;
_cancel_thread->join();
}
diff --git a/be/src/runtime/result_queue_mgr.cpp b/be/src/runtime/result_queue_mgr.cpp
index b29bc7c..cfdd60a 100644
--- a/be/src/runtime/result_queue_mgr.cpp
+++ b/be/src/runtime/result_queue_mgr.cpp
@@ -28,16 +28,19 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(result_block_queue_count, MetricUnit::NOUNIT);
+
ResultQueueMgr::ResultQueueMgr() {
// Each BlockingQueue has a limited size (default 20, by config::max_memory_sink_batch_count),
// it's not needed to count the actual size of all BlockingQueue.
- REGISTER_GAUGE_DORIS_METRIC(result_block_queue_count, [this]() {
+ REGISTER_HOOK_METRIC(result_block_queue_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
return _fragment_queue_map.size();
});
}
ResultQueueMgr::~ResultQueueMgr() {
+ DEREGISTER_HOOK_METRIC(result_block_queue_count);
}
Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std::shared_ptr<arrow::RecordBatch>* result, bool *eos) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 0d8eadd..3438251 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -34,6 +34,35 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(routine_load_task_count, MetricUnit::NOUNIT);
+
+RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
+ : _exec_env(exec_env),
+ _thread_pool(config::routine_load_thread_pool_size, 1),
+ _data_consumer_pool(10) {
+ REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _task_map.size();
+ });
+
+ _data_consumer_pool.start_bg_worker();
+}
+
+RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
+ DEREGISTER_HOOK_METRIC(routine_load_task_count);
+ _thread_pool.shutdown();
+ _thread_pool.join();
+
+ LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
+ for (auto it = _task_map.begin(); it != _task_map.end(); ++it) {
+ auto ctx = it->second;
+ if (ctx->unref()) {
+ delete ctx;
+ }
+ }
+ _task_map.clear();
+}
+
Status RoutineLoadTaskExecutor::get_kafka_partition_meta(
const PKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids) {
DCHECK(request.has_kafka_info());
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h
index aca731a..73f278c 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -43,31 +43,9 @@ class RoutineLoadTaskExecutor {
public:
typedef std::function<void(StreamLoadContext*)> ExecFinishCallback;
- RoutineLoadTaskExecutor(ExecEnv* exec_env)
- : _exec_env(exec_env),
- _thread_pool(config::routine_load_thread_pool_size, 1),
- _data_consumer_pool(10) {
- REGISTER_GAUGE_DORIS_METRIC(routine_load_task_count, [this]() {
- std::lock_guard<std::mutex> l(_lock);
- return _task_map.size();
- });
-
- _data_consumer_pool.start_bg_worker();
- }
-
- ~RoutineLoadTaskExecutor() {
- _thread_pool.shutdown();
- _thread_pool.join();
-
- LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup";
- for (auto it = _task_map.begin(); it != _task_map.end(); ++it) {
- auto ctx = it->second;
- if (ctx->unref()) {
- delete ctx;
- }
- }
- _task_map.clear();
- }
+ RoutineLoadTaskExecutor(ExecEnv* exec_env);
+
+ ~RoutineLoadTaskExecutor();
// submit a routine load task
Status submit_task(const TRoutineLoadTask& task);
diff --git a/be/src/runtime/small_file_mgr.cpp b/be/src/runtime/small_file_mgr.cpp
index dcac9fc..a44a315 100644
--- a/be/src/runtime/small_file_mgr.cpp
+++ b/be/src/runtime/small_file_mgr.cpp
@@ -38,18 +38,21 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(small_file_cache_count, MetricUnit::NOUNIT);
+
SmallFileMgr::SmallFileMgr(
ExecEnv* env,
const std::string& local_path) :
_exec_env(env),
_local_path(local_path) {
- REGISTER_GAUGE_DORIS_METRIC(small_file_cache_count, [this]() {
+ REGISTER_HOOK_METRIC(small_file_cache_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
return _file_cache.size();
});
}
SmallFileMgr::~SmallFileMgr() {
+ DEREGISTER_HOOK_METRIC(small_file_cache_count);
}
Status SmallFileMgr::init() {
diff --git a/be/src/http/action/metrics_action.h b/be/src/runtime/stream_load/load_stream_mgr.cpp
similarity index 60%
copy from be/src/http/action/metrics_action.h
copy to be/src/runtime/stream_load/load_stream_mgr.cpp
index 1707d44..72fe114 100644
--- a/be/src/http/action/metrics_action.h
+++ b/be/src/runtime/stream_load/load_stream_mgr.cpp
@@ -15,25 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#include "http/http_handler.h"
+#include "runtime/stream_load/load_stream_mgr.h"
namespace doris {
-class Webserver;
-class ExecEnv;
-class HttpRequest;
-class MetricRegistry;
-
-class MetricsAction : public HttpHandler {
-public:
- MetricsAction(MetricRegistry* metrics) :_metrics(metrics) { }
- virtual ~MetricsAction() { }
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(stream_load_pipe_count, MetricUnit::NOUNIT);
- void handle(HttpRequest *req) override;
-private:
- MetricRegistry* _metrics;
-};
+LoadStreamMgr::LoadStreamMgr() {
+ // Each StreamLoadPipe has a limited buffer size (default 1M), it's not needed to count the
+ // actual size of all StreamLoadPipe.
+ REGISTER_HOOK_METRIC(stream_load_pipe_count, [this]() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _stream_map.size();
+ });
+}
+LoadStreamMgr::~LoadStreamMgr() {
+ DEREGISTER_HOOK_METRIC(stream_load_pipe_count);
+}
}
diff --git a/be/src/runtime/stream_load/load_stream_mgr.h b/be/src/runtime/stream_load/load_stream_mgr.h
index 1e08cb3..0579cb8 100644
--- a/be/src/runtime/stream_load/load_stream_mgr.h
+++ b/be/src/runtime/stream_load/load_stream_mgr.h
@@ -30,15 +30,8 @@ namespace doris {
// used to register all streams in process so that other module can get this stream
class LoadStreamMgr {
public:
- LoadStreamMgr() {
- // Each StreamLoadPipe has a limited buffer size (default 1M), it's not needed to count the
- // actual size of all StreamLoadPipe.
- REGISTER_GAUGE_DORIS_METRIC(stream_load_pipe_count, [this]() {
- std::lock_guard<std::mutex> l(_lock);
- return _stream_map.size();
- });
- }
- ~LoadStreamMgr() { }
+ LoadStreamMgr();
+ ~LoadStreamMgr();
Status put(const UniqueId& id,
std::shared_ptr<StreamLoadPipe> stream) {
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 57c33a5..01a5616 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -27,6 +27,8 @@
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_writer_count, MetricUnit::NOUNIT);
+
std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const std::shared_ptr<MemTracker>& mem_tracker):
@@ -34,7 +36,7 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const std::shared_p
_mem_tracker = MemTracker::CreateTracker(-1, "tablets channel", mem_tracker);
static std::once_flag once_flag;
std::call_once(once_flag, [] {
- REGISTER_GAUGE_DORIS_METRIC(tablet_writer_count, [&]() {
+ REGISTER_HOOK_METRIC(tablet_writer_count, [&]() {
return _s_tablet_writer_count.load();
});
});
diff --git a/be/src/runtime/test_env.cc b/be/src/runtime/test_env.cc
index 1289aaa..6698c06 100644
--- a/be/src/runtime/test_env.cc
+++ b/be/src/runtime/test_env.cc
@@ -23,32 +23,21 @@ using boost::shared_ptr;
namespace doris {
-boost::scoped_ptr<MetricRegistry> TestEnv::_s_static_metrics;
-
TestEnv::TestEnv() {
- if (_s_static_metrics == NULL) {
- _s_static_metrics.reset(new MetricRegistry("test_env"));
- }
_exec_env.reset(new ExecEnv());
// _exec_env->init_for_tests();
_io_mgr_tracker.reset(new MemTracker(-1));
_block_mgr_parent_tracker.reset(new MemTracker(-1));
_exec_env->disk_io_mgr()->init(_io_mgr_tracker);
- init_metrics();
_tmp_file_mgr.reset(new TmpFileMgr());
- _tmp_file_mgr->init(_metrics.get());
-}
-
-void TestEnv::init_metrics() {
- _metrics.reset(new MetricRegistry("test_env"));
+ _tmp_file_mgr->init();
}
void TestEnv::init_tmp_file_mgr(const std::vector<std::string>& tmp_dirs,
bool one_dir_per_device) {
// Need to recreate metrics to avoid error when registering metric twice.
- init_metrics();
_tmp_file_mgr.reset(new TmpFileMgr());
- _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device, _metrics.get());
+ _tmp_file_mgr->init_custom(tmp_dirs, one_dir_per_device);
}
TestEnv::~TestEnv() {
@@ -58,7 +47,6 @@ TestEnv::~TestEnv() {
_exec_env.reset();
_io_mgr_tracker.reset();
_tmp_file_mgr.reset();
- _metrics.reset();
}
RuntimeState* TestEnv::create_runtime_state(int64_t query_id) {
diff --git a/be/src/runtime/test_env.h b/be/src/runtime/test_env.h
index 1aa937e..59f834c 100644
--- a/be/src/runtime/test_env.h
+++ b/be/src/runtime/test_env.h
@@ -62,9 +62,6 @@ public:
MemTracker* io_mgr_tracker() {
return _io_mgr_tracker.get();
}
- MetricRegistry* metrics() {
- return _metrics.get();
- }
TmpFileMgr* tmp_file_mgr() {
return _tmp_file_mgr.get();
}
@@ -78,11 +75,9 @@ private:
RuntimeState* create_runtime_state(int64_t query_id);
// Global state for test environment.
- static boost::scoped_ptr<MetricRegistry> _s_static_metrics;
boost::scoped_ptr<ExecEnv> _exec_env;
std::shared_ptr<MemTracker> _block_mgr_parent_tracker;
std::shared_ptr<MemTracker> _io_mgr_tracker;
- boost::scoped_ptr<MetricRegistry> _metrics;
boost::scoped_ptr<TmpFileMgr> _tmp_file_mgr;
// Per-query states with associated block managers.
diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc
index d286cea..a2c71bc 100644
--- a/be/src/runtime/tmp_file_mgr.cc
+++ b/be/src/runtime/tmp_file_mgr.cc
@@ -47,27 +47,33 @@ using std::vector;
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(active_scratch_dirs, MetricUnit::NOUNIT, "Metric to track active scratch directories");
+
const std::string _s_tmp_sub_dir_name = "doris-scratch";
const uint64_t _s_available_space_threshold_mb = 1024;
-// Metric keys
-const std::string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp_file_mgr.active_scratch_dirs";
-const std::string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST = "tmp_file_mgr.active_scratch_dirs.list";
-
TmpFileMgr::TmpFileMgr(ExecEnv* exec_env) :
- _exec_env(exec_env), _initialized(false), _dir_status_lock(), _tmp_dirs() { }
- // _num_active_scratch_dirs_metric(NULL), _active_scratch_dirs_metric(NULL) {}
+ _exec_env(exec_env), _initialized(false), _dir_status_lock(), _tmp_dirs() {
+ METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs);
+}
+
+TmpFileMgr::TmpFileMgr() {
+ METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs);
+}
+
+TmpFileMgr::~TmpFileMgr() {
+ METRIC_DEREGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs);
+}
-Status TmpFileMgr::init(MetricRegistry* metrics) {
+Status TmpFileMgr::init() {
vector<string> all_tmp_dirs;
for (auto& path : _exec_env->store_paths()) {
all_tmp_dirs.emplace_back(path.path);
}
- return init_custom(all_tmp_dirs, true, metrics);
+ return init_custom(all_tmp_dirs, true);
}
-Status TmpFileMgr::init_custom(
- const vector<string>& tmp_dirs, bool one_dir_per_device, MetricRegistry* metrics) {
+Status TmpFileMgr::init_custom(const vector<string>& tmp_dirs, bool one_dir_per_device) {
DCHECK(!_initialized);
if (tmp_dirs.empty()) {
LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
@@ -118,19 +124,7 @@ Status TmpFileMgr::init_custom(
}
}
- DCHECK(metrics != NULL);
- _num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NOUNIT));
- metrics->register_metric("active_scratch_dirs", _num_active_scratch_dirs_metric.get());
- //_active_scratch_dirs_metric = metrics->register_metric(new SetMetric<std::string>(
- // TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST,
- // std::set<std::string>()));
- // TODO(zc):
- // _active_scratch_dirs_metric = SetMetric<string>::CreateAndRegister(
- // metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, std::set<std::string>());
- _num_active_scratch_dirs_metric->set_value(_tmp_dirs.size());
- // for (int i = 0; i < _tmp_dirs.size(); ++i) {
- // _active_scratch_dirs_metric->add(_tmp_dirs[i].path());
- // }
+ active_scratch_dirs.set_value(_tmp_dirs.size());
_initialized = true;
@@ -182,8 +176,7 @@ void TmpFileMgr::blacklist_device(DeviceId device_id) {
added = _tmp_dirs[device_id].blacklist();
}
if (added) {
- _num_active_scratch_dirs_metric->increment(-1);
- // _active_scratch_dirs_metric->remove(_tmp_dirs[device_id].path());
+ active_scratch_dirs.increment(-1);
}
}
diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h
index 6ba9f57..d7ad149 100644
--- a/be/src/runtime/tmp_file_mgr.h
+++ b/be/src/runtime/tmp_file_mgr.h
@@ -26,7 +26,6 @@
namespace doris {
-class MetricRegistry;
class ExecEnv;
// TmpFileMgr creates and manages temporary files and directories on the local
@@ -115,23 +114,20 @@ public:
};
TmpFileMgr(ExecEnv* exec_env);
- TmpFileMgr() { }
+ TmpFileMgr();
- ~TmpFileMgr(){
- // do nothing.
- }
+ ~TmpFileMgr();
// Creates the configured tmp directories. If multiple directories are specified per
// disk, only one is created and used. Must be called after DiskInfo::Init().
- Status init(MetricRegistry* metrics);
+ Status init();
// Custom initialization - initializes with the provided list of directories.
// If one_dir_per_device is true, only use one temporary directory per device.
// This interface is intended for testing purposes.
Status init_custom(
const std::vector<std::string>& tmp_dirs,
- bool one_dir_per_device,
- MetricRegistry* metrics);
+ bool one_dir_per_device);
// Return a new File handle with a unique path for a query instance. The file path
// is within the (single) tmp directory on the specified device id. The caller owns
@@ -195,9 +191,8 @@ private:
// The created tmp directories.
std::vector<Dir> _tmp_dirs;
- // MetricRegistry to track active scratch directories.
- std::unique_ptr<IntGauge> _num_active_scratch_dirs_metric;
- // SetMetric<std::string>* _active_scratch_dirs_metric;
+ // Metric to track active scratch directories.
+ IntGauge active_scratch_dirs;
};
} // end namespace doris
diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp
index f2708f3..09f22dd 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -87,7 +87,6 @@ Status BackendService::create_service(ExecEnv* exec_env, int port, ThriftServer*
*server = new ThriftServer("backend",
be_processor,
port,
- exec_env->metrics(),
config::be_service_threads);
LOG(INFO) << "DorisInternalService listening on " << port;
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index b1b30bf..8a65e30 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -90,7 +90,7 @@ Status HttpService::start() {
// register metrics
{
- auto action = new MetricsAction(DorisMetrics::instance()->metrics());
+ auto action = new MetricsAction(DorisMetrics::instance()->metric_registry());
_ev_http_server->register_handler(HttpMethod::GET, "/metrics", action);
}
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 5107d7a..d54a4d2 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -99,6 +99,7 @@ set(UTIL_FILES
timezone_utils.cpp
easy_json.cc
mustache/mustache.cc
+ brpc_stub_cache.cpp
)
if (WITH_MYSQL)
diff --git a/be/src/http/action/metrics_action.h b/be/src/util/brpc_stub_cache.cpp
similarity index 63%
copy from be/src/http/action/metrics_action.h
copy to be/src/util/brpc_stub_cache.cpp
index 1707d44..a80cc28 100644
--- a/be/src/http/action/metrics_action.h
+++ b/be/src/util/brpc_stub_cache.cpp
@@ -15,25 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#include "http/http_handler.h"
+#include "util/brpc_stub_cache.h"
namespace doris {
-class Webserver;
-class ExecEnv;
-class HttpRequest;
-class MetricRegistry;
-
-class MetricsAction : public HttpHandler {
-public:
- MetricsAction(MetricRegistry* metrics) :_metrics(metrics) { }
- virtual ~MetricsAction() { }
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
- void handle(HttpRequest *req) override;
-private:
- MetricRegistry* _metrics;
-};
+BrpcStubCache::BrpcStubCache() {
+ _stub_map.init(239);
+ REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() {
+ std::lock_guard<SpinLock> l(_lock);
+ return _stub_map.size();
+ });
+}
+BrpcStubCache::~BrpcStubCache() {
+ DEREGISTER_HOOK_METRIC(brpc_endpoint_stub_count);
+ for (auto& stub : _stub_map) {
+ delete stub.second;
+ }
+}
}
diff --git a/be/src/util/brpc_stub_cache.h b/be/src/util/brpc_stub_cache.h
index c185c1c..9610ca4 100644
--- a/be/src/util/brpc_stub_cache.h
+++ b/be/src/util/brpc_stub_cache.h
@@ -32,18 +32,8 @@ namespace doris {
// map used
class BrpcStubCache {
public:
- BrpcStubCache() {
- _stub_map.init(239);
- REGISTER_GAUGE_DORIS_METRIC(brpc_endpoint_stub_count, [this]() {
- std::lock_guard<SpinLock> l(_lock);
- return _stub_map.size();
- });
- }
- ~BrpcStubCache() {
- for (auto& stub : _stub_map) {
- delete stub.second;
- }
- }
+ BrpcStubCache();
+ ~BrpcStubCache();
palo::PInternalService_Stub* get_stub(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 8264e0d..f9169c7 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -15,193 +15,229 @@
// specific language governing permissions and limitations
// under the License.
+#include "util/doris_metrics.h"
+
#include <sys/types.h>
#include <unistd.h>
-#include "util/doris_metrics.h"
-
#include "env/env.h"
-
#include "util/debug_util.h"
#include "util/file_utils.h"
#include "util/system_metrics.h"
namespace doris {
+DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, MetricUnit::REQUESTS, "Total fragment requests received.");
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, MetricUnit::MICROSECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_requests_total, MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_request_send_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total, MetricUnit::REQUESTS, "", push_requests_total, Labels({{"status", "SUCCESS"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_fail_total, MetricUnit::REQUESTS, "", push_requests_total, Labels({{"status", "FAIL"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_duration_us, MetricUnit::MICROSECONDS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(push_request_write_rows, MetricUnit::ROWS);
+
+#define DEFINE_ENGINE_COUNTER_METRIC(name, type, status) \
+ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(name, MetricUnit::REQUESTS, "", engine_requests_total, Labels({{"type", #type}, {"status", #status}}));
+
+DEFINE_ENGINE_COUNTER_METRIC(create_tablet_requests_total, create_tablet, total);
+DEFINE_ENGINE_COUNTER_METRIC(create_tablet_requests_failed, create_tablet, failed);
+DEFINE_ENGINE_COUNTER_METRIC(drop_tablet_requests_total, drop_tablet, total);
+DEFINE_ENGINE_COUNTER_METRIC(report_all_tablets_requests_total, report_all_tablets, total);
+DEFINE_ENGINE_COUNTER_METRIC(report_all_tablets_requests_failed, report_all_tablets, failed);
+DEFINE_ENGINE_COUNTER_METRIC(report_tablet_requests_total, report_tablet, total);
+DEFINE_ENGINE_COUNTER_METRIC(report_tablet_requests_failed, report_tablet, failed);
+DEFINE_ENGINE_COUNTER_METRIC(report_disk_requests_total, report_disk, total);
+DEFINE_ENGINE_COUNTER_METRIC(report_disk_requests_failed, report_disk, failed);
+DEFINE_ENGINE_COUNTER_METRIC(report_task_requests_total, report_task, total);
+DEFINE_ENGINE_COUNTER_METRIC(report_task_requests_failed, report_task, failed);
+DEFINE_ENGINE_COUNTER_METRIC(schema_change_requests_total, schema_change, total);
+DEFINE_ENGINE_COUNTER_METRIC(schema_change_requests_failed, schema_change, failed);
+DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_total, create_rollup, total);
+DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_failed, create_rollup, failed);
+DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_requests_total, storage_migrate, total);
+DEFINE_ENGINE_COUNTER_METRIC(delete_requests_total, delete, total);
+DEFINE_ENGINE_COUNTER_METRIC(delete_requests_failed, delete, failed);
+DEFINE_ENGINE_COUNTER_METRIC(clone_requests_total, clone, total);
+DEFINE_ENGINE_COUNTER_METRIC(clone_requests_failed, clone, failed);
+DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_total, finish_task, total);
+DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_failed, finish_task, failed);
+DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_total, base_compaction, total);
+DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_failed, base_compaction, failed);
+DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_total, cumulative_compaction, total);
+DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_failed, cumulative_compaction, failed);
+DEFINE_ENGINE_COUNTER_METRIC(publish_task_request_total, publish, total);
+DEFINE_ENGINE_COUNTER_METRIC(publish_task_failed_total, publish, failed);
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_deltas_total, MetricUnit::ROWSETS, "", compaction_deltas_total, Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_deltas_total, MetricUnit::ROWSETS, "", compaction_deltas_total, Labels({{"type", "cumulative"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_bytes_total, MetricUnit::BYTES, "", compaction_bytes_total, Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_bytes_total, MetricUnit::BYTES, "", compaction_bytes_total, Labels({{"type", "cumulative"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(meta_write_request_total, MetricUnit::REQUESTS, "", meta_request_total, Labels({{"type", "write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(meta_read_request_total, MetricUnit::REQUESTS, "", meta_request_total, Labels({{"type", "read"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(meta_write_request_duration_us, MetricUnit::MICROSECONDS, "", meta_request_duration, Labels({{"type", "write"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(meta_read_request_duration_us, MetricUnit::MICROSECONDS, "", meta_request_duration, Labels({{"type", "read"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_read_total, MetricUnit::OPERATIONS, "(segment_v2) total number of segments read", segment_read, Labels({{"type", "segment_total_read_times"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_row_total, MetricUnit::ROWS, "(segment_v2) total number of rows in queried segments (before index pruning)", segment_read, Labels({{"type", "segment_total_row_num"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_rows_by_short_key, MetricUnit::ROWS, "(segment_v2) total number of rows selected by short key index", segment_read, Labels({{"type", "segment_rows_by_short_key"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_rows_read_by_zone_map, MetricUnit::ROWS, "(segment_v2) total number of rows selected by zone map index", segment_read, Labels({{"type", "segment_rows_read_by_zone_map"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(txn_begin_request_total, MetricUnit::OPERATIONS, "", txn_request, Labels({{"type", "begin"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(txn_commit_request_total, MetricUnit::OPERATIONS, "", txn_request, Labels({{"type", "commit"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(txn_rollback_request_total, MetricUnit::OPERATIONS, "", txn_request, Labels({{"type", "rollback"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(txn_exec_plan_total, MetricUnit::OPERATIONS, "", txn_request, Labels({{"type", "exec"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_receive_bytes_total, MetricUnit::BYTES, "", stream_load, Labels({{"type", "receive_bytes"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_rows_total, MetricUnit::ROWS, "", stream_load, Labels({{"type", "load_rows"}}));
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_rows, MetricUnit::ROWS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memory_pool_bytes_total, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_fd_num_used, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(process_fd_num_limit_soft, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(process_fd_num_limit_hard, MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_cumulative_max_compaction_score, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(tablet_base_max_compaction_score, MetricUnit::NOUNIT);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(push_request_write_bytes_per_second, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_scan_bytes_per_second, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(max_disk_io_util_percent, MetricUnit::PERCENT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(max_network_send_bytes_rate, MetricUnit::BYTES);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(max_network_receive_bytes_rate, MetricUnit::BYTES);
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(readable_blocks_total, MetricUnit::BLOCKS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(writable_blocks_total, MetricUnit::BLOCKS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(blocks_created_total, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(blocks_deleted_total, MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(bytes_read_total, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(bytes_written_total, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(disk_sync_total, MetricUnit::OPERATIONS);
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_reading, MetricUnit::BLOCKS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_writing, MetricUnit::BLOCKS);
+
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
-DorisMetrics::DorisMetrics() : _metrics(_s_registry_name) {
-#define REGISTER_DORIS_METRIC(name) _metrics.register_metric(#name, &name)
- // You can put DorisMetrics's metrics initial code here
- REGISTER_DORIS_METRIC(fragment_requests_total);
- REGISTER_DORIS_METRIC(fragment_request_duration_us);
- REGISTER_DORIS_METRIC(http_requests_total);
- REGISTER_DORIS_METRIC(http_request_send_bytes);
- REGISTER_DORIS_METRIC(query_scan_bytes);
- REGISTER_DORIS_METRIC(query_scan_rows);
-
- REGISTER_DORIS_METRIC(memtable_flush_total);
- REGISTER_DORIS_METRIC(memtable_flush_duration_us);
-
- // push request
- _metrics.register_metric(
- "push_requests_total", MetricLabels().add("status", "SUCCESS"),
- &push_requests_success_total);
- _metrics.register_metric(
- "push_requests_total", MetricLabels().add("status", "FAIL"),
- &push_requests_fail_total);
- REGISTER_DORIS_METRIC(push_request_duration_us);
- REGISTER_DORIS_METRIC(push_request_write_bytes);
- REGISTER_DORIS_METRIC(push_request_write_rows);
-
-#define REGISTER_ENGINE_REQUEST_METRIC(type, status, metric) \
- _metrics.register_metric( \
- "engine_requests_total", MetricLabels().add("type", #type).add("status", #status), &metric)
-
- REGISTER_ENGINE_REQUEST_METRIC(create_tablet, total, create_tablet_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(create_tablet, failed, create_tablet_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(drop_tablet, total, drop_tablet_requests_total);
-
- REGISTER_ENGINE_REQUEST_METRIC(report_all_tablets, total, report_all_tablets_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(report_all_tablets, failed, report_all_tablets_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(report_tablet, total, report_tablet_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(report_tablet, failed, report_tablet_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(report_disk, total, report_disk_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(report_disk, failed, report_disk_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(report_task, total, report_task_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(report_task, failed, report_task_requests_failed);
-
- REGISTER_ENGINE_REQUEST_METRIC(schema_change, total, schema_change_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(schema_change, failed, schema_change_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(create_rollup, total, create_rollup_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(create_rollup, failed, create_rollup_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(storage_migrate, total, storage_migrate_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(delete, total, delete_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(delete, failed, delete_requests_failed);
- REGISTER_ENGINE_REQUEST_METRIC(clone, total, clone_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(clone, failed, clone_requests_failed);
-
- REGISTER_ENGINE_REQUEST_METRIC(finish_task, total, finish_task_requests_total);
- REGISTER_ENGINE_REQUEST_METRIC(finish_task, failed, finish_task_requests_failed);
-
- REGISTER_ENGINE_REQUEST_METRIC(base_compaction, total, base_compaction_request_total);
- REGISTER_ENGINE_REQUEST_METRIC(base_compaction, failed, base_compaction_request_failed);
- REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, total, cumulative_compaction_request_total);
- REGISTER_ENGINE_REQUEST_METRIC(cumulative_compaction, failed, cumulative_compaction_request_failed);
-
- REGISTER_ENGINE_REQUEST_METRIC(publish, total, publish_task_request_total);
- REGISTER_ENGINE_REQUEST_METRIC(publish, failed, publish_task_failed_total);
-
- _metrics.register_metric(
- "compaction_deltas_total", MetricLabels().add("type", "base"),
- &base_compaction_deltas_total);
- _metrics.register_metric(
- "compaction_deltas_total", MetricLabels().add("type", "cumulative"),
- &cumulative_compaction_deltas_total);
- _metrics.register_metric(
- "compaction_bytes_total", MetricLabels().add("type", "base"),
- &base_compaction_bytes_total);
- _metrics.register_metric(
- "compaction_bytes_total", MetricLabels().add("type", "cumulative"),
- &cumulative_compaction_bytes_total);
-
- _metrics.register_metric(
- "meta_request_total", MetricLabels().add("type", "write"),
- &meta_write_request_total);
- _metrics.register_metric(
- "meta_request_total", MetricLabels().add("type", "read"),
- &meta_read_request_total);
- _metrics.register_metric(
- "meta_request_duration", MetricLabels().add("type", "write"),
- &meta_write_request_duration_us);
- _metrics.register_metric(
- "meta_request_duration", MetricLabels().add("type", "read"),
- &meta_read_request_duration_us);
-
- _metrics.register_metric(
- "segment_read", MetricLabels().add("type", "segment_total_read_times"),
- &segment_read_total);
- _metrics.register_metric(
- "segment_read", MetricLabels().add("type", "segment_total_row_num"),
- &segment_row_total);
- _metrics.register_metric(
- "segment_read", MetricLabels().add("type", "segment_rows_by_short_key"),
- &segment_rows_by_short_key);
- _metrics.register_metric(
- "segment_read", MetricLabels().add("type", "segment_rows_read_by_zone_map"),
- &segment_rows_read_by_zone_map);
-
- _metrics.register_metric(
- "txn_request", MetricLabels().add("type", "begin"),
- &txn_begin_request_total);
- _metrics.register_metric(
- "txn_request", MetricLabels().add("type", "commit"),
- &txn_commit_request_total);
- _metrics.register_metric(
- "txn_request", MetricLabels().add("type", "rollback"),
- &txn_rollback_request_total);
- _metrics.register_metric(
- "txn_request", MetricLabels().add("type", "exec"),
- &txn_exec_plan_total);
-
- _metrics.register_metric(
- "stream_load", MetricLabels().add("type", "receive_bytes"),
- &stream_receive_bytes_total);
- _metrics.register_metric(
- "stream_load", MetricLabels().add("type", "load_rows"),
- &stream_load_rows_total);
- _metrics.register_metric("load_rows", &load_rows_total);
- _metrics.register_metric("load_bytes", &load_bytes_total);
-
- // Gauge
- REGISTER_DORIS_METRIC(memory_pool_bytes_total);
- REGISTER_DORIS_METRIC(process_thread_num);
- REGISTER_DORIS_METRIC(process_fd_num_used);
- REGISTER_DORIS_METRIC(process_fd_num_limit_soft);
- REGISTER_DORIS_METRIC(process_fd_num_limit_hard);
-
- REGISTER_DORIS_METRIC(tablet_cumulative_max_compaction_score);
- REGISTER_DORIS_METRIC(tablet_base_max_compaction_score);
-
- REGISTER_DORIS_METRIC(push_request_write_bytes_per_second);
- REGISTER_DORIS_METRIC(query_scan_bytes_per_second);
- REGISTER_DORIS_METRIC(max_disk_io_util_percent);
- REGISTER_DORIS_METRIC(max_network_send_bytes_rate);
- REGISTER_DORIS_METRIC(max_network_receive_bytes_rate);
-
- _metrics.register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this));
-
- REGISTER_DORIS_METRIC(readable_blocks_total);
- REGISTER_DORIS_METRIC(writable_blocks_total);
- REGISTER_DORIS_METRIC(blocks_created_total);
- REGISTER_DORIS_METRIC(blocks_deleted_total);
- REGISTER_DORIS_METRIC(bytes_read_total);
- REGISTER_DORIS_METRIC(bytes_written_total);
- REGISTER_DORIS_METRIC(disk_sync_total);
- REGISTER_DORIS_METRIC(blocks_open_reading);
- REGISTER_DORIS_METRIC(blocks_open_writing);
+DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
+ _server_metric_entity = _metric_registry.register_entity("server", {});
+
+ METRIC_REGISTER(_server_metric_entity, fragment_requests_total);
+ METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us);
+ METRIC_REGISTER(_server_metric_entity, http_requests_total);
+ METRIC_REGISTER(_server_metric_entity, http_request_send_bytes);
+ METRIC_REGISTER(_server_metric_entity, query_scan_bytes);
+ METRIC_REGISTER(_server_metric_entity, query_scan_rows);
+
+ METRIC_REGISTER(_server_metric_entity, push_requests_success_total);
+ METRIC_REGISTER(_server_metric_entity, push_requests_fail_total);
+ METRIC_REGISTER(_server_metric_entity, push_request_duration_us);
+ METRIC_REGISTER(_server_metric_entity, push_request_write_bytes);
+ METRIC_REGISTER(_server_metric_entity, push_request_write_rows);
+
+ // engine_requests_total
+ METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total);
+ METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, drop_tablet_requests_total);
+ METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_total);
+ METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, report_tablet_requests_total);
+ METRIC_REGISTER(_server_metric_entity, report_tablet_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, report_disk_requests_total);
+ METRIC_REGISTER(_server_metric_entity, report_disk_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, report_task_requests_total);
+ METRIC_REGISTER(_server_metric_entity, report_task_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, schema_change_requests_total);
+ METRIC_REGISTER(_server_metric_entity, schema_change_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, create_rollup_requests_total);
+ METRIC_REGISTER(_server_metric_entity, create_rollup_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, storage_migrate_requests_total);
+ METRIC_REGISTER(_server_metric_entity, delete_requests_total);
+ METRIC_REGISTER(_server_metric_entity, delete_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, clone_requests_total);
+ METRIC_REGISTER(_server_metric_entity, clone_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, finish_task_requests_total);
+ METRIC_REGISTER(_server_metric_entity, finish_task_requests_failed);
+ METRIC_REGISTER(_server_metric_entity, base_compaction_request_total);
+ METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed);
+ METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total);
+ METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed);
+ METRIC_REGISTER(_server_metric_entity, publish_task_request_total);
+ METRIC_REGISTER(_server_metric_entity, publish_task_failed_total);
+
+ METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total);
+ METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total);
+ METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total);
+ METRIC_REGISTER(_server_metric_entity, cumulative_compaction_bytes_total);
+
+ METRIC_REGISTER(_server_metric_entity, meta_write_request_total);
+ METRIC_REGISTER(_server_metric_entity, meta_write_request_duration_us);
+ METRIC_REGISTER(_server_metric_entity, meta_read_request_total);
+ METRIC_REGISTER(_server_metric_entity, meta_read_request_duration_us);
+
+ METRIC_REGISTER(_server_metric_entity, segment_read_total);
+ METRIC_REGISTER(_server_metric_entity, segment_row_total);
+ METRIC_REGISTER(_server_metric_entity, segment_rows_by_short_key);
+ METRIC_REGISTER(_server_metric_entity, segment_rows_read_by_zone_map);
+
+ METRIC_REGISTER(_server_metric_entity, txn_begin_request_total);
+ METRIC_REGISTER(_server_metric_entity, txn_commit_request_total);
+ METRIC_REGISTER(_server_metric_entity, txn_rollback_request_total);
+ METRIC_REGISTER(_server_metric_entity, txn_exec_plan_total);
+ METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total);
+ METRIC_REGISTER(_server_metric_entity, stream_load_rows_total);
+
+ METRIC_REGISTER(_server_metric_entity, memtable_flush_total);
+ METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us);
+
+ METRIC_REGISTER(_server_metric_entity, memory_pool_bytes_total);
+ METRIC_REGISTER(_server_metric_entity, process_thread_num);
+ METRIC_REGISTER(_server_metric_entity, process_fd_num_used);
+ METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_soft);
+ METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard);
+
+ METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score);
+ METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score);
+
+ METRIC_REGISTER(_server_metric_entity, push_request_write_bytes_per_second);
+ METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second);
+ METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent);
+ METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate);
+ METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate);
+
+ METRIC_REGISTER(_server_metric_entity, readable_blocks_total);
+ METRIC_REGISTER(_server_metric_entity, writable_blocks_total);
+ METRIC_REGISTER(_server_metric_entity, blocks_created_total);
+ METRIC_REGISTER(_server_metric_entity, blocks_deleted_total);
+ METRIC_REGISTER(_server_metric_entity, bytes_read_total);
+ METRIC_REGISTER(_server_metric_entity, bytes_written_total);
+ METRIC_REGISTER(_server_metric_entity, disk_sync_total);
+ METRIC_REGISTER(_server_metric_entity, blocks_open_reading);
+ METRIC_REGISTER(_server_metric_entity, blocks_open_writing);
+
+ METRIC_REGISTER(_server_metric_entity, load_rows);
+ METRIC_REGISTER(_server_metric_entity, load_bytes);
+
+ _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this));
}
void DorisMetrics::initialize(
- const std::vector<std::string>& paths,
bool init_system_metrics,
const std::set<std::string>& disk_devices,
const std::vector<std::string>& network_interfaces) {
- // disk usage
- for (auto& path : paths) {
- IntGauge* gauge = disks_total_capacity.add_metric(path, MetricUnit::BYTES);
- _metrics.register_metric("disks_total_capacity", MetricLabels().add("path", path), gauge);
- gauge = disks_avail_capacity.add_metric(path, MetricUnit::BYTES);
- _metrics.register_metric("disks_avail_capacity", MetricLabels().add("path", path), gauge);
- gauge = disks_data_used_capacity.add_metric(path, MetricUnit::BYTES);
- _metrics.register_metric("disks_data_used_capacity", MetricLabels().add("path", path), gauge);
- gauge = disks_state.add_metric(path, MetricUnit::NOUNIT);
- _metrics.register_metric("disks_state", MetricLabels().add("path", path), gauge);
- }
-
if (init_system_metrics) {
- _system_metrics.install(&_metrics, disk_devices, network_interfaces);
+ _system_metrics.reset(new SystemMetrics(&_metric_registry, disk_devices, network_interfaces));
}
}
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 229b764..0b5a8bd 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -28,163 +28,146 @@
namespace doris {
-class IntGaugeMetricsMap {
-public:
- void set_metric(const std::string& key, int64_t val) {
- auto metric = metrics.find(key);
- if (metric != metrics.end()) {
- metric->second->set_value(val);
- }
- }
+#define REGISTER_HOOK_METRIC(name, func) \
+ DorisMetrics::instance()->server_entity()->register_metric(&METRIC_##name, &DorisMetrics::instance()->name); \
+ DorisMetrics::instance()->server_entity()->register_hook(#name, [&]() { \
+ DorisMetrics::instance()->name.set_value(func()); \
+});
- IntGauge* add_metric(const std::string& key, const MetricUnit unit) {
- metrics.emplace(key, new IntGauge(unit));
- return metrics.find(key)->second.get();
- }
-
-private:
- std::unordered_map<std::string, std::unique_ptr<IntGauge>> metrics;
-};
-
-#define REGISTER_GAUGE_DORIS_METRIC(name, func) \
- DorisMetrics::instance()->metrics()->register_metric(#name, &DorisMetrics::instance()->name); \
- DorisMetrics::instance()->metrics()->register_hook( \
- #name, [&]() { DorisMetrics::instance()->name.set_value(func()); });
+#define DEREGISTER_HOOK_METRIC(name) \
+ DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \
+ DorisMetrics::instance()->server_entity()->deregister_hook(#name);
class DorisMetrics {
public:
- // counters
- METRIC_DEFINE_INT_COUNTER(fragment_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(fragment_request_duration_us, MetricUnit::MICROSECONDS);
- METRIC_DEFINE_INT_COUNTER(http_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(http_request_send_bytes, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(query_scan_bytes, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(query_scan_rows, MetricUnit::ROWS);
- METRIC_DEFINE_INT_COUNTER(push_requests_success_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(push_requests_fail_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(push_request_duration_us, MetricUnit::MICROSECONDS);
- METRIC_DEFINE_INT_COUNTER(push_request_write_bytes, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(push_request_write_rows, MetricUnit::ROWS);
- METRIC_DEFINE_INT_COUNTER(create_tablet_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(create_tablet_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(drop_tablet_requests_total, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_all_tablets_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_tablet_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_tablet_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_disk_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_disk_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_task_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(report_task_requests_failed, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(schema_change_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(schema_change_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(create_rollup_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(create_rollup_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(storage_migrate_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(delete_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(delete_requests_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(clone_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(clone_requests_failed, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(finish_task_requests_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(finish_task_requests_failed, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(base_compaction_request_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(base_compaction_request_failed, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(cumulative_compaction_request_failed, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(base_compaction_deltas_total, MetricUnit::ROWSETS);
- METRIC_DEFINE_INT_COUNTER(base_compaction_bytes_total, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(cumulative_compaction_deltas_total, MetricUnit::ROWSETS);
- METRIC_DEFINE_INT_COUNTER(cumulative_compaction_bytes_total, MetricUnit::BYTES);
-
- METRIC_DEFINE_INT_COUNTER(publish_task_request_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(publish_task_failed_total, MetricUnit::REQUESTS);
-
- METRIC_DEFINE_INT_COUNTER(meta_write_request_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(meta_write_request_duration_us, MetricUnit::MICROSECONDS);
- METRIC_DEFINE_INT_COUNTER(meta_read_request_total, MetricUnit::REQUESTS);
- METRIC_DEFINE_INT_COUNTER(meta_read_request_duration_us, MetricUnit::MICROSECONDS);
+ IntCounter fragment_requests_total;
+ IntCounter fragment_request_duration_us;
+ IntCounter http_requests_total;
+ IntCounter http_request_send_bytes;
+ IntCounter query_scan_bytes;
+ IntCounter query_scan_rows;
+
+ IntCounter push_requests_success_total;
+ IntCounter push_requests_fail_total;
+ IntCounter push_request_duration_us;
+ IntCounter push_request_write_bytes;
+ IntCounter push_request_write_rows;
+ IntCounter create_tablet_requests_total;
+ IntCounter create_tablet_requests_failed;
+ IntCounter drop_tablet_requests_total;
+
+ IntCounter report_all_tablets_requests_total;
+ IntCounter report_all_tablets_requests_failed;
+ IntCounter report_tablet_requests_total;
+ IntCounter report_tablet_requests_failed;
+ IntCounter report_disk_requests_total;
+ IntCounter report_disk_requests_failed;
+ IntCounter report_task_requests_total;
+ IntCounter report_task_requests_failed;
+
+ IntCounter schema_change_requests_total;
+ IntCounter schema_change_requests_failed;
+ IntCounter create_rollup_requests_total;
+ IntCounter create_rollup_requests_failed;
+ IntCounter storage_migrate_requests_total;
+ IntCounter delete_requests_total;
+ IntCounter delete_requests_failed;
+ IntCounter clone_requests_total;
+ IntCounter clone_requests_failed;
+
+ IntCounter finish_task_requests_total;
+ IntCounter finish_task_requests_failed;
+
+ IntCounter base_compaction_request_total;
+ IntCounter base_compaction_request_failed;
+ IntCounter cumulative_compaction_request_total;
+ IntCounter cumulative_compaction_request_failed;
+
+ IntCounter base_compaction_deltas_total;
+ IntCounter base_compaction_bytes_total;
+ IntCounter cumulative_compaction_deltas_total;
+ IntCounter cumulative_compaction_bytes_total;
+
+ IntCounter publish_task_request_total;
+ IntCounter publish_task_failed_total;
+
+ IntCounter meta_write_request_total;
+ IntCounter meta_write_request_duration_us;
+ IntCounter meta_read_request_total;
+ IntCounter meta_read_request_duration_us;
// Counters for segment_v2
// -----------------------
// total number of segments read
- METRIC_DEFINE_INT_COUNTER(segment_read_total, MetricUnit::OPERATIONS);
+ IntCounter segment_read_total;
// total number of rows in queried segments (before index pruning)
- METRIC_DEFINE_INT_COUNTER(segment_row_total, MetricUnit::ROWS);
+ IntCounter segment_row_total;
// total number of rows selected by short key index
- METRIC_DEFINE_INT_COUNTER(segment_rows_by_short_key, MetricUnit::ROWS);
+ IntCounter segment_rows_by_short_key;
// total number of rows selected by zone map index
- METRIC_DEFINE_INT_COUNTER(segment_rows_read_by_zone_map, MetricUnit::ROWS);
-
- METRIC_DEFINE_INT_COUNTER(txn_begin_request_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(txn_commit_request_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(txn_rollback_request_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(txn_exec_plan_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(stream_receive_bytes_total, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(stream_load_rows_total, MetricUnit::ROWS);
- METRIC_DEFINE_INT_COUNTER(load_rows_total, MetricUnit::ROWS);
- METRIC_DEFINE_INT_COUNTER(load_bytes_total, MetricUnit::BYTES);
-
- METRIC_DEFINE_INT_COUNTER(memtable_flush_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(memtable_flush_duration_us, MetricUnit::MICROSECONDS);
-
- // Gauges
- METRIC_DEFINE_INT_GAUGE(memory_pool_bytes_total, MetricUnit::BYTES);
- METRIC_DEFINE_INT_GAUGE(process_thread_num, MetricUnit::NOUNIT);
- METRIC_DEFINE_INT_GAUGE(process_fd_num_used, MetricUnit::NOUNIT);
- METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_soft, MetricUnit::NOUNIT);
- METRIC_DEFINE_INT_GAUGE(process_fd_num_limit_hard, MetricUnit::NOUNIT);
- IntGaugeMetricsMap disks_total_capacity;
- IntGaugeMetricsMap disks_avail_capacity;
- IntGaugeMetricsMap disks_data_used_capacity;
- IntGaugeMetricsMap disks_state;
+ IntCounter segment_rows_read_by_zone_map;
+
+ IntCounter txn_begin_request_total;
+ IntCounter txn_commit_request_total;
+ IntCounter txn_rollback_request_total;
+ IntCounter txn_exec_plan_total;
+ IntCounter stream_receive_bytes_total;
+ IntCounter stream_load_rows_total;
+ IntCounter load_rows;
+ IntCounter load_bytes;
+
+ IntCounter memtable_flush_total;
+ IntCounter memtable_flush_duration_us;
+
+ IntGauge memory_pool_bytes_total;
+ IntGauge process_thread_num;
+ IntGauge process_fd_num_used;
+ IntGauge process_fd_num_limit_soft;
+ IntGauge process_fd_num_limit_hard;
// the max compaction score of all tablets.
// Record base and cumulative scores separately, because
// we need to get the larger of the two.
- METRIC_DEFINE_INT_GAUGE(tablet_cumulative_max_compaction_score, MetricUnit::NOUNIT);
- METRIC_DEFINE_INT_GAUGE(tablet_base_max_compaction_score, MetricUnit::NOUNIT);
+ IntGauge tablet_cumulative_max_compaction_score;
+ IntGauge tablet_base_max_compaction_score;
// The following metrics will be calculated
// by metric calculator
- METRIC_DEFINE_INT_GAUGE(push_request_write_bytes_per_second, MetricUnit::BYTES);
- METRIC_DEFINE_INT_GAUGE(query_scan_bytes_per_second, MetricUnit::BYTES);
- METRIC_DEFINE_INT_GAUGE(max_disk_io_util_percent, MetricUnit::PERCENT);
- METRIC_DEFINE_INT_GAUGE(max_network_send_bytes_rate, MetricUnit::BYTES);
- METRIC_DEFINE_INT_GAUGE(max_network_receive_bytes_rate, MetricUnit::BYTES);
+ IntGauge push_request_write_bytes_per_second;
+ IntGauge query_scan_bytes_per_second;
+ IntGauge max_disk_io_util_percent;
+ IntGauge max_network_send_bytes_rate;
+ IntGauge max_network_receive_bytes_rate;
// Metrics related with BlockManager
- METRIC_DEFINE_INT_COUNTER(readable_blocks_total, MetricUnit::BLOCKS);
- METRIC_DEFINE_INT_COUNTER(writable_blocks_total, MetricUnit::BLOCKS);
- METRIC_DEFINE_INT_COUNTER(blocks_created_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(blocks_deleted_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_COUNTER(bytes_read_total, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(bytes_written_total, MetricUnit::BYTES);
- METRIC_DEFINE_INT_COUNTER(disk_sync_total, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_GAUGE(blocks_open_reading, MetricUnit::BLOCKS);
- METRIC_DEFINE_INT_GAUGE(blocks_open_writing, MetricUnit::BLOCKS);
+ IntCounter readable_blocks_total;
+ IntCounter writable_blocks_total;
+ IntCounter blocks_created_total;
+ IntCounter blocks_deleted_total;
+ IntCounter bytes_read_total;
+ IntCounter bytes_written_total;
+ IntCounter disk_sync_total;
+ IntGauge blocks_open_reading;
+ IntGauge blocks_open_writing;
// Size of some global containers
- METRIC_DEFINE_UINT_GAUGE(rowset_count_generated_and_in_use, MetricUnit::ROWSETS);
- METRIC_DEFINE_UINT_GAUGE(unused_rowsets_count, MetricUnit::ROWSETS);
- METRIC_DEFINE_UINT_GAUGE(broker_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(data_stream_receiver_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(fragment_endpoint_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(active_scan_context_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(plan_fragment_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(load_channel_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(result_buffer_block_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(result_block_queue_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(routine_load_task_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(small_file_cache_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NOUNIT);
- METRIC_DEFINE_UINT_GAUGE(compaction_mem_current_consumption, MetricUnit::BYTES);
+ UIntGauge rowset_count_generated_and_in_use;
+ UIntGauge unused_rowsets_count;
+ UIntGauge broker_count;
+ UIntGauge data_stream_receiver_count;
+ UIntGauge fragment_endpoint_count;
+ UIntGauge active_scan_context_count;
+ UIntGauge plan_fragment_count;
+ UIntGauge load_channel_count;
+ UIntGauge result_buffer_block_count;
+ UIntGauge result_block_queue_count;
+ UIntGauge routine_load_task_count;
+ UIntGauge small_file_cache_count;
+ UIntGauge stream_load_pipe_count;
+ UIntGauge brpc_endpoint_stub_count;
+ UIntGauge tablet_writer_count;
+
+ UIntGauge compaction_mem_current_consumption;
static DorisMetrics* instance() {
static DorisMetrics instance;
@@ -193,13 +176,13 @@ public:
// not thread-safe, call before calling metrics
void initialize(
- const std::vector<std::string>& paths = std::vector<std::string>(),
- bool init_system_metrics = false,
- const std::set<std::string>& disk_devices = std::set<std::string>(),
- const std::vector<std::string>& network_interfaces = std::vector<std::string>());
+ bool init_system_metrics = false,
+ const std::set<std::string>& disk_devices = std::set<std::string>(),
+ const std::vector<std::string>& network_interfaces = std::vector<std::string>());
- MetricRegistry* metrics() { return &_metrics; }
- SystemMetrics* system_metrics() { return &_system_metrics; }
+ MetricRegistry* metric_registry() { return &_metric_registry; }
+ SystemMetrics* system_metrics() { return _system_metrics.get(); }
+ MetricEntity* server_entity() { return _server_metric_entity; }
private:
// Don't allow constrctor
@@ -213,8 +196,11 @@ private:
static const std::string _s_registry_name;
static const std::string _s_hook_name;
- MetricRegistry _metrics;
- SystemMetrics _system_metrics;
+ MetricRegistry _metric_registry;
+
+ std::unique_ptr<SystemMetrics> _system_metrics;
+
+ MetricEntity* _server_metric_entity;
};
}; // namespace doris
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index 7b3f385..1a1ff93 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -17,9 +17,10 @@
#include "util/metrics.h"
-namespace doris {
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/writer.h>
-MetricLabels MetricLabels::EmptyLabels;
+namespace doris {
std::ostream& operator<<(std::ostream& os, MetricType type) {
switch (type) {
@@ -76,118 +77,209 @@ const char* unit_name(MetricUnit unit) {
}
}
-void Metric::hide() {
- if (_registry == nullptr) {
- return;
+std::string labels_to_string(const Labels& entity_labels, const Labels& metric_labels) {
+ if (entity_labels.empty() && metric_labels.empty()) {
+ return std::string();
}
- _registry->deregister_metric(this);
- _registry = nullptr;
-}
-bool MetricCollector::add_metic(const MetricLabels& labels, Metric* metric) {
- if (empty()) {
- _type = metric->type();
- } else if (metric->type() != _type) {
- return false;
+ std::stringstream ss;
+ ss << "{";
+ int i = 0;
+ for (const auto& label : entity_labels) {
+ if (i++ > 0) {
+ ss << ",";
+ }
+ ss << label.first << "=\"" << label.second << "\"";
}
- auto it = _metrics.emplace(labels, metric);
- return it.second;
-}
-
-void MetricCollector::remove_metric(Metric* metric) {
- for (auto& it : _metrics) {
- if (it.second == metric) {
- _metrics.erase(it.first);
- break;
+ for (const auto& label : metric_labels) {
+ if (i++ > 0) {
+ ss << ",";
}
+ ss << label.first << "=\"" << label.second << "\"";
}
+ ss << "}";
+
+ return ss.str();
+}
+
+std::string MetricPrototype::simple_name() const {
+ return group_name.empty() ? name : group_name;
+}
+
+std::string MetricPrototype::combine_name(const std::string& registry_name) const {
+ return (registry_name.empty() ? std::string() : registry_name + "_") + simple_name();
+}
+
+void MetricEntity::register_metric(const MetricPrototype* metric_type, Metric* metric) {
+ std::lock_guard<SpinLock> l(_lock);
+ DCHECK(_metrics.find(metric_type) == _metrics.end()) << "metric is already exist! " << _name << ":" << metric_type->name;
+ _metrics.emplace(metric_type, metric);
}
-Metric* MetricCollector::get_metric(const MetricLabels& labels) const {
- auto it = _metrics.find(labels);
- if (it != _metrics.end()) {
- return it->second;
+void MetricEntity::deregister_metric(const MetricPrototype* metric_type) {
+ std::lock_guard<SpinLock> l(_lock);
+ _metrics.erase(metric_type);
+}
+
+Metric* MetricEntity::get_metric(const std::string& name, const std::string& group_name) const {
+ MetricPrototype dummy(MetricType::UNTYPED, MetricUnit::NOUNIT, name, "", group_name);
+ std::lock_guard<SpinLock> l(_lock);
+ auto it = _metrics.find(&dummy);
+ if (it == _metrics.end()) {
+ return nullptr;
}
- return nullptr;
+ return it->second;
+}
+
+void MetricEntity::register_hook(const std::string& name, const std::function<void()>& hook) {
+ std::lock_guard<SpinLock> l(_lock);
+ DCHECK(_hooks.find(name) == _hooks.end()) << "hook is already exist! " << _name << ":" << name;
+ _hooks.emplace(name, hook);
+}
+
+void MetricEntity::deregister_hook(const std::string& name) {
+ std::lock_guard<SpinLock> l(_lock);
+ _hooks.erase(name);
}
-void MetricCollector::get_metrics(std::vector<Metric*>* metrics) {
- DCHECK(metrics != nullptr);
- for (const auto& it : _metrics) {
- metrics->push_back(it.second);
+void MetricEntity::trigger_hook_unlocked(bool force) const {
+ // When 'enable_metric_calculator' is true, hooks will be triggered by a background thread,
+ // see 'calculate_metrics' in daemon.cpp for more details.
+ if (!force && config::enable_metric_calculator) {
+ return;
+ }
+ for (const auto& hook : _hooks) {
+ hook.second();
}
}
MetricRegistry::~MetricRegistry() {
- {
- std::lock_guard<SpinLock> l(_lock);
+}
- std::vector<Metric*> metrics;
- for (const auto& it : _collectors) {
- it.second->get_metrics(&metrics);
- }
- for (auto metric : metrics) {
- _deregister_locked(metric);
- }
- }
- // All register metric will deregister
- DCHECK(_collectors.empty()) << "_collectors not empty, size=" << _collectors.size();
+MetricEntity* MetricRegistry::register_entity(const std::string& name, const Labels& labels) {
+ std::shared_ptr<MetricEntity> entity = std::make_shared<MetricEntity>(name, labels);
+
+ std::lock_guard<SpinLock> l(_lock);
+ DCHECK(_entities.find(name) == _entities.end()) << name;
+ _entities.insert(std::make_pair(name, entity));
+ return entity.get();
+}
+
+void MetricRegistry::deregister_entity(const std::string& name) {
+ std::lock_guard<SpinLock> l(_lock);
+ _entities.erase(name);
}
-bool MetricRegistry::register_metric(const std::string& name,
- const MetricLabels& labels,
- Metric* metric) {
- DCHECK(metric != nullptr);
- metric->hide();
+std::shared_ptr<MetricEntity> MetricRegistry::get_entity(const std::string& name) {
std::lock_guard<SpinLock> l(_lock);
- MetricCollector* collector = nullptr;
- auto it = _collectors.find(name);
- if (it == _collectors.end()) {
- collector = new MetricCollector();
- _collectors.emplace(name, collector);
- } else {
- collector = it->second;
+ auto entity = _entities.find(name);
+ if (entity == _entities.end()) {
+ return std::shared_ptr<MetricEntity>();
}
- auto res = collector->add_metic(labels, metric);
- if (res) {
- metric->_registry = this;
+ return entity->second;
+}
+
+void MetricRegistry::trigger_all_hooks(bool force) const {
+ std::lock_guard<SpinLock> l(_lock);
+ for (const auto& entity : _entities) {
+ std::lock_guard<SpinLock> l(entity.second->_lock);
+ entity.second->trigger_hook_unlocked(force);
}
- return res;
}
-void MetricRegistry::_deregister_locked(Metric* metric) {
- std::vector<std::string> to_erase;
- for (auto& it : _collectors) {
- it.second->remove_metric(metric);
- if (it.second->empty()) {
- to_erase.emplace_back(it.first);
+std::string MetricRegistry::to_prometheus() const {
+ std::stringstream ss;
+ // Reorder by MetricPrototype
+ EntityMetricsByType entity_metrics_by_types;
+ std::lock_guard<SpinLock> l(_lock);
+ for (const auto& entity : _entities) {
+ std::lock_guard<SpinLock> l(entity.second->_lock);
+ entity.second->trigger_hook_unlocked(false);
+ for (const auto& metric : entity.second->_metrics) {
+ std::pair<MetricEntity*, Metric*> new_elem = std::make_pair(entity.second.get(), metric.second);
+ auto found = entity_metrics_by_types.find(metric.first);
+ if (found == entity_metrics_by_types.end()) {
+ entity_metrics_by_types.emplace(metric.first, std::vector<std::pair<MetricEntity*, Metric*>>({new_elem}));
+ } else {
+ found->second.emplace_back(new_elem);
+ }
}
}
- for (auto& name : to_erase) {
- auto it = _collectors.find(name);
- delete it->second;
- _collectors.erase(it);
+ // Output
+ std::string last_group_name;
+ for (const auto& entity_metrics_by_type : entity_metrics_by_types) {
+ if (last_group_name.empty() || last_group_name != entity_metrics_by_type.first->group_name) {
+ ss << "# TYPE " << entity_metrics_by_type.first->combine_name(_name) << " "
+ << entity_metrics_by_type.first->type << "\n"; // metric TYPE line
+ }
+ last_group_name = entity_metrics_by_type.first->group_name;
+ std::string display_name = entity_metrics_by_type.first->combine_name(_name);
+ for (const auto& entity_metric : entity_metrics_by_type.second) {
+ ss << display_name // metric name
+ << labels_to_string(entity_metric.first->_labels, entity_metrics_by_type.first->labels) // metric labels
+ << " " << entity_metric.second->to_string() << "\n"; // metric value
+ }
}
+
+ return ss.str();
}
-Metric* MetricRegistry::get_metric(const std::string& name, const MetricLabels& labels) const {
+std::string MetricRegistry::to_json() const {
+ rj::Document doc{rj::kArrayType};
+ rj::Document::AllocatorType& allocator = doc.GetAllocator();
std::lock_guard<SpinLock> l(_lock);
- auto it = _collectors.find(name);
- if (it != _collectors.end()) {
- return it->second->get_metric(labels);
+ for (const auto& entity : _entities) {
+ std::lock_guard<SpinLock> l(entity.second->_lock);
+ entity.second->trigger_hook_unlocked(false);
+ for (const auto& metric : entity.second->_metrics) {
+ rj::Value metric_obj(rj::kObjectType);
+ // tags
+ rj::Value tag_obj(rj::kObjectType);
+ tag_obj.AddMember("metric", rj::Value(metric.first->simple_name().c_str(), allocator), allocator);
+ // MetricPrototype's labels
+ for (auto& label : metric.first->labels) {
+ tag_obj.AddMember(
+ rj::Value(label.first.c_str(), allocator),
+ rj::Value(label.second.c_str(), allocator),
+ allocator);
+ }
+ // MetricEntity's labels
+ for (auto& label : entity.second->_labels) {
+ tag_obj.AddMember(
+ rj::Value(label.first.c_str(), allocator),
+ rj::Value(label.second.c_str(), allocator),
+ allocator);
+ }
+ metric_obj.AddMember("tags", tag_obj, allocator);
+ // unit
+ rj::Value unit_val(unit_name(metric.first->unit), allocator);
+ metric_obj.AddMember("unit", unit_val, allocator);
+ // value
+ metric_obj.AddMember("value", metric.second->to_json_value(), allocator);
+ doc.PushBack(metric_obj, allocator);
+ }
}
- return nullptr;
-}
-bool MetricRegistry::register_hook(const std::string& name, const std::function<void()>& hook) {
- std::lock_guard<SpinLock> l(_lock);
- auto it = _hooks.emplace(name, hook);
- return it.second;
+ rj::StringBuffer strBuf;
+ rj::Writer<rj::StringBuffer> writer(strBuf);
+ doc.Accept(writer);
+ return strBuf.GetString();
}
-void MetricRegistry::deregister_hook(const std::string& name) {
+std::string MetricRegistry::to_core_string() const {
+ std::stringstream ss;
std::lock_guard<SpinLock> l(_lock);
- _hooks.erase(name);
+ for (const auto& entity : _entities) {
+ std::lock_guard<SpinLock> l(entity.second->_lock);
+ entity.second->trigger_hook_unlocked(false);
+ for (const auto &metric : entity.second->_metrics) {
+ if (metric.first->is_core_metric) {
+ ss << metric.first->combine_name(_name) << " LONG " << metric.second->to_string() << "\n";
+ }
+ }
+ }
+
+ return ss.str();
}
}
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 736c258..50093fe 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -25,6 +25,7 @@
#include <string>
#include <mutex>
#include <iomanip>
+#include <unordered_map>
#include <rapidjson/rapidjson.h>
#include <rapidjson/document.h>
@@ -69,43 +70,26 @@ const char* unit_name(MetricUnit unit);
class Metric {
public:
- Metric(MetricType type, MetricUnit unit)
- : _type(type),
- _unit(unit),
- _registry(nullptr) {}
- virtual ~Metric() { hide(); }
+ Metric() {}
+ virtual ~Metric() {}
virtual std::string to_string() const = 0;
- MetricType type() const { return _type; }
- MetricUnit unit() const { return _unit; }
- void hide();
- virtual void write_value(rj::Value& metric_obj,
- rj::Document::AllocatorType& allocator) = 0;
+ virtual rj::Value to_json_value() const = 0;
+
private:
friend class MetricRegistry;
-
- MetricType _type = MetricType::UNTYPED;
- MetricUnit _unit = MetricUnit::NOUNIT;
- MetricRegistry* _registry;
};
// Metric that only can increment
template<typename T>
class AtomicMetric : public Metric {
public:
- AtomicMetric(MetricType type, MetricUnit unit)
- : Metric(type, unit),
- _value(T()) {}
- virtual ~AtomicMetric() { }
+ AtomicMetric() : _value(T()) {}
+ virtual ~AtomicMetric() {}
std::string to_string() const override {
return std::to_string(value());
}
- void write_value(rj::Value& metric_obj,
- rj::Document::AllocatorType& allocator) override {
- metric_obj.AddMember("value", rj::Value(value()), allocator);
- }
-
T value() const {
return _value.load();
}
@@ -113,9 +97,15 @@ public:
void increment(const T& delta) {
_value.fetch_add(delta);
}
+
void set_value(const T& value) {
_value.store(value);
}
+
+ rj::Value to_json_value() const override {
+ return rj::Value(value());
+ }
+
protected:
std::atomic<T> _value;
};
@@ -123,19 +113,12 @@ protected:
template<typename T>
class LockSimpleMetric : public Metric {
public:
- LockSimpleMetric(MetricType type, MetricUnit unit)
- : Metric(type, unit),
- _value(T()) {}
- virtual ~LockSimpleMetric() { }
+ LockSimpleMetric() : _value(T()) {}
+ virtual ~LockSimpleMetric() {}
std::string to_string() const override {
return std::to_string(value());
}
-
- void write_value(rj::Value& metric_obj,
- rj::Document::AllocatorType& allocator) override {
- metric_obj.AddMember("value", rj::Value(value()), allocator);
- }
T value() const {
std::lock_guard<SpinLock> l(_lock);
@@ -144,12 +127,18 @@ public:
void increment(const T& delta) {
std::lock_guard<SpinLock> l(this->_lock);
- this->_value += delta;
+ _value += delta;
}
+
void set_value(const T& value) {
std::lock_guard<SpinLock> l(this->_lock);
- this->_value = value;
+ _value = value;
}
+
+ rj::Value to_json_value() const override {
+ return rj::Value(value());
+ }
+
protected:
// We use spinlock instead of std::atomic is because atomic don't support
// double's fetch_add
@@ -165,22 +154,14 @@ protected:
template<typename T>
class CoreLocalCounter : public Metric {
public:
- CoreLocalCounter(MetricUnit unit)
- : Metric(MetricType::COUNTER, unit),
- _value() {}
-
- virtual ~CoreLocalCounter() { }
+ CoreLocalCounter() {}
+ virtual ~CoreLocalCounter() {}
std::string to_string() const override {
std::stringstream ss;
ss << value();
return ss.str();
}
-
- void write_value(rj::Value& metric_obj,
- rj::Document::AllocatorType& allocator) override {
- metric_obj.AddMember("value", rj::Value(value()), allocator);
- }
T value() const {
T sum = 0;
@@ -193,6 +174,11 @@ public:
void increment(const T& delta) {
__sync_fetch_and_add(_value.access(), delta);
}
+
+ rj::Value to_json_value() const override {
+ return rj::Value(value());
+ }
+
protected:
CoreLocalValue<T> _value;
};
@@ -200,223 +186,151 @@ protected:
template<typename T>
class AtomicCounter : public AtomicMetric<T> {
public:
- AtomicCounter(MetricUnit unit)
- : AtomicMetric<T>(MetricType::COUNTER, unit) {}
- virtual ~AtomicCounter() { }
+ AtomicCounter() {}
+ virtual ~AtomicCounter() {}
};
template<typename T>
class AtomicGauge : public AtomicMetric<T> {
public:
- AtomicGauge(MetricUnit unit)
- : AtomicMetric<T>(MetricType::GAUGE, unit) {}
- virtual ~AtomicGauge() { }
+ AtomicGauge() : AtomicMetric<T>() {}
+ virtual ~AtomicGauge() {}
};
template<typename T>
class LockCounter : public LockSimpleMetric<T> {
public:
- LockCounter(MetricUnit unit)
- : LockSimpleMetric<T>(MetricType::COUNTER, unit) {}
- virtual ~LockCounter() { }
+ LockCounter() : LockSimpleMetric<T>() {}
+ virtual ~LockCounter() {}
};
// This can only used for trival type
template<typename T>
class LockGauge : public LockSimpleMetric<T> {
public:
- LockGauge(MetricUnit unit)
- : LockSimpleMetric<T>(MetricType::GAUGE, unit) {}
- virtual ~LockGauge() { }
+ LockGauge() : LockSimpleMetric<T>() {}
+ virtual ~LockGauge() {}
};
-// one key-value pair used to
-struct MetricLabel {
+using Labels = std::unordered_map<std::string, std::string>;
+struct MetricPrototype {
+public:
+ MetricPrototype(MetricType type_,
+ MetricUnit unit_,
+ std::string name_,
+ std::string description_ = "",
+ std::string group_name_ = "",
+ Labels labels_ = Labels(),
+ bool is_core_metric_ = false)
+ : is_core_metric(is_core_metric_),
+ type(type_),
+ unit(unit_),
+ name(std::move(name_)),
+ description(std::move(description_)),
+ group_name(std::move(group_name_)),
+ labels(std::move(labels_)) {}
+
+ std::string simple_name() const;
+ std::string combine_name(const std::string& registry_name) const;
+
+ bool is_core_metric;
+ MetricType type;
+ MetricUnit unit;
std::string name;
- std::string value;
+ std::string description;
+ std::string group_name;
+ Labels labels;
+};
- MetricLabel() { }
- MetricLabel(const std::string& name_, const std::string& value_) :name(name_), value(value_) {
- }
+#define DEFINE_METRIC_PROTOTYPE(name, type, unit, desc, group, labels, core) \
+ ::doris::MetricPrototype METRIC_##name(type, unit, #name, desc, group, labels, core)
- bool operator==(const MetricLabel& other) const {
- return name == other.name && value == other.value;
- }
- bool operator!=(const MetricLabel& other) const {
- return !(*this == other);
- }
- bool operator<(const MetricLabel& other) const {
- auto res = name.compare(other.name);
- if (res == 0) {
- return value < other.value;
- }
- return res < 0;
- }
- int compare(const MetricLabel& other) const {
- auto res = name.compare(other.name);
- if (res == 0) {
- return value.compare(other.value);
- }
- return res;
- }
- std::string to_string() const {
- return name + "=" + value;
- }
-};
+#define DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(name, unit) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::COUNTER, unit, "", "", Labels(), false)
-struct MetricLabels {
- static MetricLabels EmptyLabels;
- // used std::set to sort MetricLabel so that we can get compare two MetricLabels
- std::set<MetricLabel> labels;
+#define DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(name, unit, desc) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::COUNTER, unit, desc, "", Labels(), false)
- MetricLabels& add(const std::string& name, const std::string& value) {
- labels.emplace(name, value);
- return *this;
- }
+#define DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(name, unit, desc, group, labels) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::COUNTER, unit, desc, #group, labels, false)
- bool operator==(const MetricLabels& other) const {
- if (labels.size() != other.labels.size()) {
- return false;
- }
- auto it = labels.begin();
- auto other_it = other.labels.begin();
- while (it != labels.end()) {
- if (*it != *other_it) {
- return false;
- }
- ++it;
- ++other_it;
- }
- return true;
- }
- bool operator<(const MetricLabels& other) const {
- auto it = labels.begin();
- auto other_it = other.labels.begin();
- while (it != labels.end() && other_it != other.labels.end()) {
- auto res = it->compare(*other_it);
- if (res < 0) {
- return true;
- } else if (res > 0) {
- return false;
- }
- ++it;
- ++other_it;
- }
- if (it == labels.end()) {
- if (other_it == other.labels.end()) {
- return false;
- }
- return true;
- } else {
- return false;
- }
- }
- bool empty() const {
- return labels.empty();
+#define DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(name, unit) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::GAUGE, unit, "", "", Labels(), false)
+
+#define DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(name, unit) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::GAUGE, unit, "", "", Labels(), true)
+
+#define DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(name, unit, desc) \
+ DEFINE_METRIC_PROTOTYPE(name, MetricType::GAUGE, unit, desc, "", Labels(), false)
+
+#define METRIC_REGISTER(entity, metric) \
+ entity->register_metric(&METRIC_##metric, &metric)
+
+#define METRIC_DEREGISTER(entity, metric) \
+ entity->deregister_metric(&METRIC_##metric)
+
+// For 'metrics' in MetricEntity.
+struct MetricPrototypeHash {
+ size_t operator()(const MetricPrototype* metric_prototype) const {
+ return std::hash<std::string>()(metric_prototype->group_name.empty() ? metric_prototype->name : metric_prototype->group_name);
}
+};
- std::string to_string() const {
- std::stringstream ss;
- int i = 0;
- for (auto& label : labels) {
- if (i++ > 0) {
- ss << ",";
- }
- ss << label.to_string();
- }
- return ss.str();
+struct MetricPrototypeEqualTo {
+ bool operator()(const MetricPrototype* first, const MetricPrototype* second) const {
+ return first->group_name == second->group_name && first->name == second->name;
}
};
-class MetricCollector;
+using MetricMap = std::unordered_map<const MetricPrototype*, Metric*, MetricPrototypeHash, MetricPrototypeEqualTo>;
-class MetricsVisitor {
+class MetricEntity {
public:
- virtual ~MetricsVisitor() { }
+ MetricEntity(const std::string& name, const Labels& labels)
+ : _name(name), _labels(labels) {}
- // visit a collector, you can implement collector visitor, or only implement
- // metric visitor
- virtual void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) = 0;
-};
+ void register_metric(const MetricPrototype* metric_type, Metric* metric);
+ void deregister_metric(const MetricPrototype* metric_type);
+ Metric* get_metric(const std::string& name, const std::string& group_name = "") const;
-class MetricCollector {
-public:
- bool add_metic(const MetricLabels& labels, Metric* metric);
- void remove_metric(Metric* metric);
- void collect(const std::string& prefix, const std::string& name, MetricsVisitor* visitor) {
- visitor->visit(prefix, name, this);
- }
- bool empty() const {
- return _metrics.empty();
- }
- Metric* get_metric(const MetricLabels& labels) const;
- // get all metrics belong to this collector
- void get_metrics(std::vector<Metric*>* metrics);
+ // Register a hook, this hook will called before get_metric is called
+ void register_hook(const std::string& name, const std::function<void()>& hook);
+ void deregister_hook(const std::string& name);
+ void trigger_hook_unlocked(bool force) const;
- const std::map<MetricLabels, Metric*>& metrics() const {
- return _metrics;
- }
- MetricType type() const { return _type; }
private:
- MetricType _type = MetricType::UNTYPED;
- std::map<MetricLabels, Metric*> _metrics;
+ friend class MetricRegistry;
+
+ std::string _name;
+ Labels _labels;
+
+ mutable SpinLock _lock;
+ MetricMap _metrics;
+ std::map<std::string, std::function<void()>> _hooks;
};
+using EntityMetricsByType = std::unordered_map<const MetricPrototype*, std::vector<std::pair<MetricEntity*, Metric*>>, MetricPrototypeHash, MetricPrototypeEqualTo>;
+
class MetricRegistry {
public:
- MetricRegistry(const std::string& name) : _name(name) { }
+ MetricRegistry(const std::string& name) : _name(name) {}
~MetricRegistry();
- bool register_metric(const std::string& name, Metric* metric) {
- return register_metric(name, MetricLabels::EmptyLabels, metric);
- }
- bool register_metric(const std::string& name, const MetricLabels& labels, Metric* metric);
- // Now this function is not used frequently, so this is a little time consuming
- void deregister_metric(Metric* metric) {
- std::lock_guard<SpinLock> l(_lock);
- _deregister_locked(metric);
- }
- Metric* get_metric(const std::string& name) const {
- return get_metric(name, MetricLabels::EmptyLabels);
- }
- Metric* get_metric(const std::string& name, const MetricLabels& labels) const;
- // Register a hook, this hook will called before collect is called
- bool register_hook(const std::string& name, const std::function<void()>& hook);
- void deregister_hook(const std::string& name);
+ MetricEntity* register_entity(const std::string& name, const Labels& labels);
+ void deregister_entity(const std::string& name);
+ std::shared_ptr<MetricEntity> get_entity(const std::string& name);
- void collect(MetricsVisitor* visitor) {
- std::lock_guard<SpinLock> l(_lock);
- if (!config::enable_metric_calculator) {
- // Before we collect, need to call hooks
- unprotected_trigger_hook();
- }
+ void trigger_all_hooks(bool force) const;
- for (auto& it : _collectors) {
- it.second->collect(_name, it.first, visitor);
- }
- }
-
- void trigger_hook() {
- std::lock_guard<SpinLock> l(_lock);
- unprotected_trigger_hook();
- }
+ std::string to_prometheus() const;
+ std::string to_json() const;
+ std::string to_core_string() const;
private:
- void unprotected_trigger_hook() {
- for (const auto& it : _hooks) {
- it.second();
- }
- }
-
-private:
- void _deregister_locked(Metric* metric);
-
const std::string _name;
mutable SpinLock _lock;
- std::map<std::string, MetricCollector*> _collectors;
- std::map<std::string, std::function<void()>> _hooks;
+ std::unordered_map<std::string, std::shared_ptr<MetricEntity>> _entities;
};
using IntCounter = CoreLocalCounter<int64_t>;
@@ -428,25 +342,3 @@ using UIntGauge = AtomicGauge<uint64_t>;
using DoubleGauge = LockGauge<double>;
} // namespace doris
-
-// Convenience macros to metric
-#define METRIC_DEFINE_INT_COUNTER(metric_name, unit) \
- doris::IntCounter metric_name{unit}
-
-#define METRIC_DEFINE_INT_ATOMIC_COUNTER(metric_name, unit) \
- doris::IntAtomicCounter metric_name{unit}
-
-#define METRIC_DEFINE_UINT_COUNTER(metric_name, unit) \
- doris::UIntCounter metric_name{unit}
-
-#define METRIC_DEFINE_DOUBLE_COUNTER(metric_name, unit) \
- doris::DoubleCounter metric_name{unit}
-
-#define METRIC_DEFINE_INT_GAUGE(metric_name, unit) \
- doris::IntGauge metric_name{unit}
-
-#define METRIC_DEFINE_UINT_GAUGE(metric_name, unit) \
- doris::UIntGauge metric_name{unit}
-
-#define METRIC_DEFINE_DOUBLE_GAUGE(metric_name, unit) \
- doris::DoubleGauge metric_name{unit}
diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp
index 82ad140..c1aae9e 100644
--- a/be/src/util/system_metrics.cpp
+++ b/be/src/util/system_metrics.cpp
@@ -16,93 +16,211 @@
// under the License.
#include "util/system_metrics.h"
-#include "gutil/strings/split.h" // for string split
-#include "gutil/strtoint.h" // for atoi64
#include <stdio.h>
+#include <functional>
#include <gperftools/malloc_extension.h>
-#include <functional>
+#include "gutil/strings/split.h" // for string split
+#include "gutil/strtoint.h" // for atoi64
+#include "util/doris_metrics.h"
namespace doris {
-const char* SystemMetrics::_s_hook_name = "system_metrics";
+#define DEFINE_CPU_COUNTER_METRIC(metric) \
+ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cpu_##metric, MetricUnit::PERCENT, "", cpu, Labels({{"mode", #metric}}));
+DEFINE_CPU_COUNTER_METRIC(user);
+DEFINE_CPU_COUNTER_METRIC(nice);
+DEFINE_CPU_COUNTER_METRIC(system);
+DEFINE_CPU_COUNTER_METRIC(idle);
+DEFINE_CPU_COUNTER_METRIC(iowait);
+DEFINE_CPU_COUNTER_METRIC(irq);
+DEFINE_CPU_COUNTER_METRIC(soft_irq);
+DEFINE_CPU_COUNTER_METRIC(steal);
+DEFINE_CPU_COUNTER_METRIC(guest);
+DEFINE_CPU_COUNTER_METRIC(guest_nice);
// /proc/stat: http://www.linuxhowtos.org/System/procstat.htm
struct CpuMetrics {
+ CpuMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, cpu_user);
+ METRIC_REGISTER(entity, cpu_nice);
+ METRIC_REGISTER(entity, cpu_system);
+ METRIC_REGISTER(entity, cpu_idle);
+ METRIC_REGISTER(entity, cpu_iowait);
+ METRIC_REGISTER(entity, cpu_irq);
+ METRIC_REGISTER(entity, cpu_soft_irq);
+ METRIC_REGISTER(entity, cpu_steal);
+ METRIC_REGISTER(entity, cpu_guest);
+ METRIC_REGISTER(entity, cpu_guest_nice);
+
+ metrics[0] = &cpu_user;
+ metrics[1] = &cpu_nice;
+ metrics[2] = &cpu_system;
+ metrics[3] = &cpu_idle;
+ metrics[4] = &cpu_iowait;
+ metrics[5] = &cpu_irq;
+ metrics[6] = &cpu_soft_irq;
+ metrics[7] = &cpu_steal;
+ metrics[8] = &cpu_guest;
+ metrics[9] = &cpu_guest_nice;
+ }
+
static constexpr int cpu_num_metrics = 10;
- std::unique_ptr<IntAtomicCounter> metrics[cpu_num_metrics] = {
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT)),
- std::unique_ptr<IntAtomicCounter>(new IntAtomicCounter(MetricUnit::PERCENT))
- };
- static const char* cpu_metrics[cpu_num_metrics];
-};
-const char* CpuMetrics::cpu_metrics[] = {
- "user", "nice", "system", "idle", "iowait",
- "irq", "soft_irq", "steal", "guest", "guest_nice"
+ MetricEntity* entity = nullptr;
+ IntAtomicCounter cpu_user;
+ IntAtomicCounter cpu_nice;
+ IntAtomicCounter cpu_system;
+ IntAtomicCounter cpu_idle;
+ IntAtomicCounter cpu_iowait;
+ IntAtomicCounter cpu_irq;
+ IntAtomicCounter cpu_soft_irq;
+ IntAtomicCounter cpu_steal;
+ IntAtomicCounter cpu_guest;
+ IntAtomicCounter cpu_guest_nice;
+
+ IntAtomicCounter* metrics[cpu_num_metrics];
};
+#define DEFINE_MEMORY_GAUGE_METRIC(metric, unit) \
+ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memory_##metric, unit);
+DEFINE_MEMORY_GAUGE_METRIC(allocated_bytes, MetricUnit::BYTES);
+
struct MemoryMetrics {
- METRIC_DEFINE_INT_GAUGE(allocated_bytes, MetricUnit::BYTES);
+ MemoryMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, memory_allocated_bytes);
+ }
+
+ MetricEntity* entity = nullptr;
+ IntGauge memory_allocated_bytes;
};
+#define DEFINE_DISK_COUNTER_METRIC(metric, unit) \
+ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(disk_##metric, unit);
+DEFINE_DISK_COUNTER_METRIC(reads_completed, MetricUnit::OPERATIONS);
+DEFINE_DISK_COUNTER_METRIC(bytes_read, MetricUnit::BYTES);
+DEFINE_DISK_COUNTER_METRIC(read_time_ms, MetricUnit::MILLISECONDS);
+DEFINE_DISK_COUNTER_METRIC(writes_completed, MetricUnit::OPERATIONS);
+DEFINE_DISK_COUNTER_METRIC(bytes_written, MetricUnit::BYTES);
+DEFINE_DISK_COUNTER_METRIC(write_time_ms, MetricUnit::MILLISECONDS);
+DEFINE_DISK_COUNTER_METRIC(io_time_ms, MetricUnit::MILLISECONDS);
+DEFINE_DISK_COUNTER_METRIC(io_time_weigthed, MetricUnit::MILLISECONDS);
+
struct DiskMetrics {
- METRIC_DEFINE_INT_ATOMIC_COUNTER(reads_completed, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(bytes_read, MetricUnit::BYTES);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(read_time_ms, MetricUnit::MILLISECONDS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(writes_completed, MetricUnit::OPERATIONS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(bytes_written, MetricUnit::BYTES);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(write_time_ms, MetricUnit::MILLISECONDS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(io_time_ms, MetricUnit::MILLISECONDS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(io_time_weigthed, MetricUnit::MILLISECONDS);
+ DiskMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, disk_reads_completed);
+ METRIC_REGISTER(entity, disk_bytes_read);
+ METRIC_REGISTER(entity, disk_read_time_ms);
+ METRIC_REGISTER(entity, disk_writes_completed);
+ METRIC_REGISTER(entity, disk_bytes_written);
+ METRIC_REGISTER(entity, disk_write_time_ms);
+ METRIC_REGISTER(entity, disk_io_time_ms);
+ METRIC_REGISTER(entity, disk_io_time_weigthed);
+ }
+
+ MetricEntity* entity = nullptr;
+ IntAtomicCounter disk_reads_completed;
+ IntAtomicCounter disk_bytes_read;
+ IntAtomicCounter disk_read_time_ms;
+ IntAtomicCounter disk_writes_completed;
+ IntAtomicCounter disk_bytes_written;
+ IntAtomicCounter disk_write_time_ms;
+ IntAtomicCounter disk_io_time_ms;
+ IntAtomicCounter disk_io_time_weigthed;
};
-struct NetMetrics {
- METRIC_DEFINE_INT_ATOMIC_COUNTER(receive_bytes, MetricUnit::BYTES);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(receive_packets, MetricUnit::PACKETS);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(send_bytes, MetricUnit::BYTES);
- METRIC_DEFINE_INT_ATOMIC_COUNTER(send_packets, MetricUnit::PACKETS);
+#define DEFINE_NETWORK_COUNTER_METRIC(metric, unit) \
+ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(network_##metric, unit);
+DEFINE_NETWORK_COUNTER_METRIC(receive_bytes, MetricUnit::BYTES);
+DEFINE_NETWORK_COUNTER_METRIC(receive_packets, MetricUnit::PACKETS);
+DEFINE_NETWORK_COUNTER_METRIC(send_bytes, MetricUnit::BYTES);
+DEFINE_NETWORK_COUNTER_METRIC(send_packets, MetricUnit::PACKETS);
+
+struct NetworkMetrics {
+ NetworkMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, network_receive_bytes);
+ METRIC_REGISTER(entity, network_receive_packets);
+ METRIC_REGISTER(entity, network_send_bytes);
+ METRIC_REGISTER(entity, network_send_packets);
+ }
+
+ MetricEntity* entity = nullptr;
+ IntAtomicCounter network_receive_bytes;
+ IntAtomicCounter network_receive_packets;
+ IntAtomicCounter network_send_bytes;
+ IntAtomicCounter network_send_packets;
};
+#define DEFINE_SNMP_COUNTER_METRIC(metric, unit, desc) \
+ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(snmp_##metric, unit, desc);
+DEFINE_SNMP_COUNTER_METRIC(tcp_in_errs, MetricUnit::NOUNIT, "The number of all problematic TCP packets received");
+DEFINE_SNMP_COUNTER_METRIC(tcp_retrans_segs, MetricUnit::NOUNIT, "All TCP packets retransmitted");
+DEFINE_SNMP_COUNTER_METRIC(tcp_in_segs, MetricUnit::NOUNIT, "All received TCP packets");
+DEFINE_SNMP_COUNTER_METRIC(tcp_out_segs, MetricUnit::NOUNIT, "All send TCP packets with RST mark");
+
// metrics read from /proc/net/snmp
struct SnmpMetrics {
- // The number of all problematic TCP packets received
- METRIC_DEFINE_INT_ATOMIC_COUNTER(tcp_in_errs, MetricUnit::NOUNIT);
- // All TCP packets retransmitted
- METRIC_DEFINE_INT_ATOMIC_COUNTER(tcp_retrans_segs, MetricUnit::NOUNIT);
- // All received TCP packets
- METRIC_DEFINE_INT_ATOMIC_COUNTER(tcp_in_segs, MetricUnit::NOUNIT);
- // All send TCP packets with RST mark
- METRIC_DEFINE_INT_ATOMIC_COUNTER(tcp_out_segs, MetricUnit::NOUNIT);
+ SnmpMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, snmp_tcp_in_errs);
+ METRIC_REGISTER(entity, snmp_tcp_retrans_segs);
+ METRIC_REGISTER(entity, snmp_tcp_in_segs);
+ METRIC_REGISTER(entity, snmp_tcp_out_segs);
+ }
+
+ MetricEntity* entity = nullptr;
+ IntAtomicCounter snmp_tcp_in_errs;
+ IntAtomicCounter snmp_tcp_retrans_segs;
+ IntAtomicCounter snmp_tcp_in_segs;
+ IntAtomicCounter snmp_tcp_out_segs;
};
+#define DEFINE_FD_COUNTER_METRIC(metric, unit) \
+ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(fd_##metric, unit);
+DEFINE_FD_COUNTER_METRIC(num_limit, MetricUnit::NOUNIT);
+DEFINE_FD_COUNTER_METRIC(num_used, MetricUnit::NOUNIT);
+
struct FileDescriptorMetrics {
- METRIC_DEFINE_INT_GAUGE(fd_num_limit, MetricUnit::NOUNIT);
- METRIC_DEFINE_INT_GAUGE(fd_num_used, MetricUnit::NOUNIT);
+ FileDescriptorMetrics(MetricEntity* ent) : entity(ent) {
+ METRIC_REGISTER(entity, fd_num_limit);
+ METRIC_REGISTER(entity, fd_num_used);
+ }
+
+ MetricEntity* entity = nullptr;
+ IntGauge fd_num_limit;
+ IntGauge fd_num_used;
};
-SystemMetrics::SystemMetrics() {
+const char* SystemMetrics::_s_hook_name = "system_metrics";
+
+SystemMetrics::SystemMetrics(MetricRegistry* registry,
+ const std::set<std::string>& disk_devices,
+ const std::vector<std::string>& network_interfaces) {
+ DCHECK(registry != nullptr);
+ _registry = registry;
+#ifndef BE_TEST
+ auto entity = DorisMetrics::instance()->server_entity();
+#else
+ auto entity = _registry->register_entity("server", {});
+#endif
+ DCHECK(entity != nullptr);
+ entity->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this));
+ _install_cpu_metrics(entity);
+ _install_memory_metrics(entity);
+ _install_disk_metrics(disk_devices);
+ _install_net_metrics(network_interfaces);
+ _install_fd_metrics(entity);
+ _install_snmp_metrics(entity);
}
SystemMetrics::~SystemMetrics() {
- // we must deregister us from registry
- if (_registry != nullptr) {
- _registry->deregister_hook(_s_hook_name);
- _registry = nullptr;
- }
+ DCHECK(_registry != nullptr);
+ _registry->get_entity("server")->deregister_hook(_s_hook_name);
+
for (auto& it : _disk_metrics) {
delete it.second;
}
- for (auto& it : _net_metrics) {
+ for (auto& it : _network_metrics) {
delete it.second;
}
if (_line_ptr != nullptr) {
@@ -110,22 +228,6 @@ SystemMetrics::~SystemMetrics() {
}
}
-void SystemMetrics::install(MetricRegistry* registry,
- const std::set<std::string>& disk_devices,
- const std::vector<std::string>& network_interfaces) {
- DCHECK(_registry == nullptr);
- if (!registry->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this))) {
- return;
- }
- _install_cpu_metrics(registry);
- _install_memory_metrics(registry);
- _install_disk_metrics(registry, disk_devices);
- _install_net_metrics(registry, network_interfaces);
- _install_fd_metrics(registry);
- _install_snmp_metrics(registry);
- _registry = registry;
-}
-
void SystemMetrics::update() {
_update_cpu_metrics();
_update_memory_metrics();
@@ -135,14 +237,8 @@ void SystemMetrics::update() {
_update_snmp_metrics();
}
-void SystemMetrics::_install_cpu_metrics(MetricRegistry* registry) {
- _cpu_metrics.reset(new CpuMetrics());
-
- for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
- registry->register_metric("cpu",
- MetricLabels().add("mode", CpuMetrics::cpu_metrics[i]),
- _cpu_metrics->metrics[i].get());
- }
+void SystemMetrics::_install_cpu_metrics(MetricEntity* entity) {
+ _cpu_metrics.reset(new CpuMetrics(entity));
}
#ifdef BE_TEST
@@ -195,10 +291,8 @@ void SystemMetrics::_update_cpu_metrics() {
fclose(fp);
}
-void SystemMetrics::_install_memory_metrics(MetricRegistry* registry) {
- _memory_metrics.reset(new MemoryMetrics());
-
- registry->register_metric("memory_allocated_bytes", &_memory_metrics->allocated_bytes);
+void SystemMetrics::_install_memory_metrics(MetricEntity* entity) {
+ _memory_metrics.reset(new MemoryMetrics(entity));
}
void SystemMetrics::_update_memory_metrics() {
@@ -208,27 +302,15 @@ void SystemMetrics::_update_memory_metrics() {
size_t allocated_bytes = 0;
MallocExtension::instance()->GetNumericProperty(
"generic.current_allocated_bytes", &allocated_bytes);
- _memory_metrics->allocated_bytes.set_value(allocated_bytes);
+ _memory_metrics->memory_allocated_bytes.set_value(allocated_bytes);
#endif
}
-void SystemMetrics::_install_disk_metrics(MetricRegistry* registry,
- const std::set<std::string>& devices) {
- for (auto& disk : devices) {
- DiskMetrics* metrics = new DiskMetrics();
-#define REGISTER_DISK_METRIC(name) \
- registry->register_metric("disk_"#name, \
- MetricLabels().add("device", disk), \
- &metrics->name)
- REGISTER_DISK_METRIC(reads_completed);
- REGISTER_DISK_METRIC(bytes_read);
- REGISTER_DISK_METRIC(read_time_ms);
- REGISTER_DISK_METRIC(writes_completed);
- REGISTER_DISK_METRIC(bytes_written);
- REGISTER_DISK_METRIC(write_time_ms);
- REGISTER_DISK_METRIC(io_time_ms);
- REGISTER_DISK_METRIC(io_time_weigthed);
- _disk_metrics.emplace(disk, metrics);
+void SystemMetrics::_install_disk_metrics(const std::set<std::string>& disk_devices) {
+ for (auto& disk_device : disk_devices) {
+ auto disk_entity = _registry->register_entity(std::string("disk_metrics.") + disk_device, {{"device", disk_device}});
+ DiskMetrics* metrics = new DiskMetrics(disk_entity);
+ _disk_metrics.emplace(disk_device, metrics);
}
}
@@ -281,26 +363,26 @@ void SystemMetrics::_update_disk_metrics() {
continue;
}
auto it = _disk_metrics.find(device);
- if (it == std::end(_disk_metrics)) {
+ if (it == _disk_metrics.end()) {
continue;
}
// update disk metrics
// reads_completed: 4 reads completed successfully
- it->second->reads_completed.set_value(values[0]);
+ it->second->disk_reads_completed.set_value(values[0]);
// bytes_read: 6 sectors read * 512; 5 reads merged is ignored
- it->second->bytes_read.set_value(values[2] * 512);
+ it->second->disk_bytes_read.set_value(values[2] * 512);
// read_time_ms: 7 time spent reading (ms)
- it->second->read_time_ms.set_value(values[3]);
+ it->second->disk_read_time_ms.set_value(values[3]);
// writes_completed: 8 writes completed
- it->second->writes_completed.set_value(values[4]);
+ it->second->disk_writes_completed.set_value(values[4]);
// bytes_written: 10 sectors write * 512; 9 writes merged is ignored
- it->second->bytes_written.set_value(values[6] * 512);
+ it->second->disk_bytes_written.set_value(values[6] * 512);
// write_time_ms: 11 time spent writing (ms)
- it->second->write_time_ms.set_value(values[7]);
+ it->second->disk_write_time_ms.set_value(values[7]);
// io_time_ms: 13 time spent doing I/Os (ms)
- it->second->io_time_ms.set_value(values[9]);
+ it->second->disk_io_time_ms.set_value(values[9]);
// io_time_weigthed: 14 - weighted time spent doing I/Os (ms)
- it->second->io_time_weigthed.set_value(values[10]);
+ it->second->disk_io_time_weigthed.set_value(values[10]);
}
if (ferror(fp) != 0) {
char buf[64];
@@ -310,32 +392,16 @@ void SystemMetrics::_update_disk_metrics() {
fclose(fp);
}
-void SystemMetrics::_install_net_metrics(MetricRegistry* registry,
- const std::vector<std::string>& interfaces) {
- for (auto& net : interfaces) {
- NetMetrics* metrics = new NetMetrics();
-#define REGISTER_NETWORK_METRIC(name) \
- registry->register_metric("network_"#name, \
- MetricLabels().add("device", net), \
- &metrics->name)
- REGISTER_NETWORK_METRIC(receive_bytes);
- REGISTER_NETWORK_METRIC(receive_packets);
- REGISTER_NETWORK_METRIC(send_bytes);
- REGISTER_NETWORK_METRIC(send_packets);
- _net_metrics.emplace(net, metrics);
+void SystemMetrics::_install_net_metrics(const std::vector<std::string>& interfaces) {
+ for (auto& interface : interfaces) {
+ auto interface_entity = _registry->register_entity(std::string("network_metrics.") + interface, {{"device", interface}});
+ NetworkMetrics* metrics = new NetworkMetrics(interface_entity);
+ _network_metrics.emplace(interface, metrics);
}
}
-void SystemMetrics::_install_snmp_metrics(MetricRegistry* registry) {
- _snmp_metrics.reset(new SnmpMetrics());
-#define REGISTER_SNMP_METRIC(name) \
- registry->register_metric("snmp", \
- MetricLabels().add("name", #name), \
- &_snmp_metrics->name)
- REGISTER_SNMP_METRIC(tcp_in_errs);
- REGISTER_SNMP_METRIC(tcp_retrans_segs);
- REGISTER_SNMP_METRIC(tcp_in_segs);
- REGISTER_SNMP_METRIC(tcp_out_segs);
+void SystemMetrics::_install_snmp_metrics(MetricEntity* entity) {
+ _snmp_metrics.reset(new SnmpMetrics(entity));
}
void SystemMetrics::_update_net_metrics() {
@@ -381,8 +447,8 @@ void SystemMetrics::_update_net_metrics() {
start++;
}
std::string interface(start, ptr - start);
- auto it = _net_metrics.find(interface);
- if (it == std::end(_net_metrics)) {
+ auto it = _network_metrics.find(interface);
+ if (it == _network_metrics.end()) {
continue;
}
ptr++;
@@ -420,10 +486,10 @@ void SystemMetrics::_update_net_metrics() {
default:
break;
}
- it->second->receive_bytes.set_value(receive_bytes);
- it->second->receive_packets.set_value(receive_packets);
- it->second->send_bytes.set_value(send_bytes);
- it->second->send_packets.set_value(send_packets);
+ it->second->network_receive_bytes.set_value(receive_bytes);
+ it->second->network_receive_packets.set_value(receive_packets);
+ it->second->network_send_bytes.set_value(send_bytes);
+ it->second->network_send_packets.set_value(send_packets);
}
if (ferror(fp) != 0) {
char buf[64];
@@ -492,10 +558,10 @@ void SystemMetrics::_update_snmp_metrics() {
int64_t in_errs = atoi64(metrics[header_map["InErrs"]]);
int64_t in_segs = atoi64(metrics[header_map["InSegs"]]);
int64_t out_segs = atoi64(metrics[header_map["OutSegs"]]);
- _snmp_metrics->tcp_retrans_segs.set_value(retrans_segs);
- _snmp_metrics->tcp_in_errs.set_value(in_errs);
- _snmp_metrics->tcp_in_segs.set_value(in_segs);
- _snmp_metrics->tcp_out_segs.set_value(out_segs);
+ _snmp_metrics->snmp_tcp_retrans_segs.set_value(retrans_segs);
+ _snmp_metrics->snmp_tcp_in_errs.set_value(in_errs);
+ _snmp_metrics->snmp_tcp_in_segs.set_value(in_segs);
+ _snmp_metrics->snmp_tcp_out_segs.set_value(out_segs);
if (ferror(fp) != 0) {
char buf[64];
@@ -505,10 +571,8 @@ void SystemMetrics::_update_snmp_metrics() {
fclose(fp);
}
-void SystemMetrics::_install_fd_metrics(MetricRegistry* registry) {
- _fd_metrics.reset(new FileDescriptorMetrics());
- registry->register_metric("fd_num_limit", &_fd_metrics->fd_num_limit);
- registry->register_metric("fd_num_used", &_fd_metrics->fd_num_used);
+void SystemMetrics::_install_fd_metrics(MetricEntity* entity) {
+ _fd_metrics.reset(new FileDescriptorMetrics(entity));
}
void SystemMetrics::_update_fd_metrics() {
@@ -552,7 +616,7 @@ int64_t SystemMetrics::get_max_io_util(
const std::map<std::string, int64_t>& lst_value, int64_t interval_sec) {
int64_t max = 0;
for (auto& it : _disk_metrics) {
- int64_t cur = it.second->io_time_ms.value();
+ int64_t cur = it.second->disk_io_time_ms.value();
const auto find = lst_value.find(it.first);
if (find == lst_value.end()) {
continue;
@@ -566,7 +630,7 @@ int64_t SystemMetrics::get_max_io_util(
void SystemMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) {
map->clear();
for (auto& it : _disk_metrics) {
- map->emplace(it.first, it.second->io_time_ms.value());
+ map->emplace(it.first, it.second->disk_io_time_ms.value());
}
}
@@ -575,10 +639,10 @@ void SystemMetrics::get_network_traffic(
std::map<std::string, int64_t>* rcv_map) {
send_map->clear();
rcv_map->clear();
- for (auto& it : _net_metrics) {
+ for (auto& it : _network_metrics) {
if (it.first == "lo") { continue; }
- send_map->emplace(it.first, it.second->send_bytes.value());
- rcv_map->emplace(it.first, it.second->receive_bytes.value());
+ send_map->emplace(it.first, it.second->network_send_bytes.value());
+ rcv_map->emplace(it.first, it.second->network_receive_bytes.value());
}
}
@@ -589,9 +653,9 @@ void SystemMetrics::get_max_net_traffic(
int64_t* send_rate, int64_t* rcv_rate) {
int64_t max_send = 0;
int64_t max_rcv = 0;
- for (auto& it : _net_metrics) {
- int64_t cur_send = it.second->send_bytes.value();
- int64_t cur_rcv = it.second->receive_bytes.value();
+ for (auto& it : _network_metrics) {
+ int64_t cur_send = it.second->network_send_bytes.value();
+ int64_t cur_rcv = it.second->network_receive_bytes.value();
const auto find_send = lst_send_map.find(it.first);
if (find_send != lst_send_map.end()) {
diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h
index f022b35..80ed604 100644
--- a/be/src/util/system_metrics.h
+++ b/be/src/util/system_metrics.h
@@ -26,20 +26,17 @@ namespace doris {
class CpuMetrics;
class MemoryMetrics;
class DiskMetrics;
-class NetMetrics;
+class NetworkMetrics;
class FileDescriptorMetrics;
class SnmpMetrics;
class SystemMetrics {
public:
- SystemMetrics();
+ SystemMetrics(MetricRegistry* registry,
+ const std::set<std::string>& disk_devices,
+ const std::vector<std::string>& network_interfaces);
~SystemMetrics();
- // install system metrics to registry
- void install(MetricRegistry* registry,
- const std::set<std::string>& disk_devices,
- const std::vector<std::string>& network_interfaces);
-
// update metrics
void update();
@@ -57,27 +54,25 @@ public:
int64_t* send_rate, int64_t* rcv_rate);
private:
- void _install_cpu_metrics(MetricRegistry*);
+ void _install_cpu_metrics(MetricEntity* entity);
// On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz;
// read /proc/stat would cost about 170us
void _update_cpu_metrics();
- void _install_memory_metrics(MetricRegistry* registry);
+ void _install_memory_metrics(MetricEntity* entity);
void _update_memory_metrics();
- void _install_disk_metrics(MetricRegistry* registry,
- const std::set<std::string>& devices);
+ void _install_disk_metrics(const std::set<std::string>& disk_devices);
void _update_disk_metrics();
- void _install_net_metrics(MetricRegistry* registry,
- const std::vector<std::string>& interfaces);
+ void _install_net_metrics(const std::vector<std::string>& interfaces);
void _update_net_metrics();
- void _install_fd_metrics(MetricRegistry* registry);
+ void _install_fd_metrics(MetricEntity* entity);
void _update_fd_metrics();
- void _install_snmp_metrics(MetricRegistry* registry);
+ void _install_snmp_metrics(MetricEntity* entity);
void _update_snmp_metrics();
private:
@@ -86,7 +81,7 @@ private:
std::unique_ptr<CpuMetrics> _cpu_metrics;
std::unique_ptr<MemoryMetrics> _memory_metrics;
std::map<std::string, DiskMetrics*> _disk_metrics;
- std::map<std::string, NetMetrics*> _net_metrics;
+ std::map<std::string, NetworkMetrics*> _network_metrics;
std::unique_ptr<FileDescriptorMetrics> _fd_metrics;
int _proc_net_dev_version = 0;
std::unique_ptr<SnmpMetrics> _snmp_metrics;
diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp
index 2931c76..442c54d 100644
--- a/be/src/util/thrift_server.cpp
+++ b/be/src/util/thrift_server.cpp
@@ -32,8 +32,13 @@
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerSocket.h>
+#include "util/doris_metrics.h"
+
namespace doris {
+DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNECTIONS, "Number of currently active connections");
+DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS, "Total connections made over the lifetime of this server");
+
// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
@@ -221,10 +226,8 @@ void* ThriftServer::ThriftServerEventProcessor::createContext(
_thrift_server->_session_handler->session_start(*_session_key);
}
- if (_thrift_server->_metrics_enabled) {
- _thrift_server->_connections_total->increment(1L);
- _thrift_server->_current_connections->increment(1L);
- }
+ _thrift_server->thrift_connections_total.increment(1L);
+ _thrift_server->thrift_current_connections.increment(1L);
// Store the _session_key in the per-client context to avoid recomputing
// it. If only this were accessible from RPC method calls, we wouldn't have to
@@ -253,16 +256,13 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(
_thrift_server->_session_keys.erase(_session_key);
}
- if (_thrift_server->_metrics_enabled) {
- _thrift_server->_current_connections->increment(-1L);
- }
+ _thrift_server->thrift_current_connections.increment(-1L);
}
ThriftServer::ThriftServer(
const std::string& name,
const boost::shared_ptr<apache::thrift::TProcessor>& processor,
int port,
- MetricRegistry* metrics,
int num_worker_threads,
ServerType server_type) :
_started(false),
@@ -274,20 +274,9 @@ ThriftServer::ThriftServer(
_server(NULL),
_processor(processor),
_session_handler(NULL) {
- if (metrics != NULL) {
- _metrics_enabled = true;
- _current_connections.reset(new IntGauge(MetricUnit::CONNECTIONS));
- metrics->register_metric("thrift_current_connections",
- MetricLabels().add("name", name),
- _current_connections.get());
-
- _connections_total.reset(new IntCounter(MetricUnit::CONNECTIONS));
- metrics->register_metric("thrift_connections_total",
- MetricLabels().add("name", name),
- _connections_total.get());
- } else {
- _metrics_enabled = false;
- }
+ _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("thrift_server.") + name, {{"name", name}});
+ METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections);
+ METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total);
}
Status ThriftServer::start() {
diff --git a/be/src/util/thrift_server.h b/be/src/util/thrift_server.h
index 3a02352..6b59eac 100644
--- a/be/src/util/thrift_server.h
+++ b/be/src/util/thrift_server.h
@@ -67,12 +67,11 @@ public:
// - name: human-readable name of this server. Should not contain spaces
// - processor: Thrift processor to handle RPCs
// - port: The port the server will listen for connections on
- // - metrics: if not NULL, the server will register metrics on this object
// - num_worker_threads: the number of worker threads to use in any thread pool
// - server_type: the type of IO strategy this server should employ
ThriftServer(const std::string& name,
const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
- MetricRegistry* metrics = NULL, int num_worker_threads = DEFAULT_WORKER_THREADS,
+ int num_worker_threads = DEFAULT_WORKER_THREADS,
ServerType server_type = THREADED);
~ThriftServer() { }
@@ -141,19 +140,16 @@ private:
typedef boost::unordered_map<SessionKey*, boost::shared_ptr<SessionKey> > SessionKeySet;
SessionKeySet _session_keys;
- // True if metrics are enabled
- bool _metrics_enabled;
-
- // Number of currently active connections
- std::unique_ptr<IntGauge> _current_connections;
-
- // Total connections made over the lifetime of this server
- std::unique_ptr<IntCounter> _connections_total;
-
// Helper class which monitors starting servers. Needs access to internal members, and
// is not used outside of this class.
class ThriftServerEventProcessor;
friend class ThriftServerEventProcessor;
+
+ MetricEntity* _thrift_server_metric_entity;
+ // Number of currently active connections
+ IntGauge thrift_current_connections;
+ // Total connections made over the lifetime of this server
+ IntCounter thrift_connections_total;
};
}
diff --git a/be/test/http/metrics_action_test.cpp b/be/test/http/metrics_action_test.cpp
index 417696e..db98454 100644
--- a/be/test/http/metrics_action_test.cpp
+++ b/be/test/http/metrics_action_test.cpp
@@ -52,46 +52,45 @@ private:
};
TEST_F(MetricsActionTest, prometheus_output) {
- MetricRegistry registry("test");
- IntGauge cpu_idle(MetricUnit::PERCENT);
+ MetricRegistry metric_registry("test");
+ MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_output", {});
+
+ IntGauge cpu_idle;
+ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT);
+ METRIC_REGISTER(entity, cpu_idle);
+
+ IntCounter put_requests_total;
+ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(put_requests_total, MetricUnit::NOUNIT, "", requests_total, Labels({{"type", "put"}, {"path", "/sports"}}));
+ METRIC_REGISTER(entity, put_requests_total);
+
cpu_idle.set_value(50);
- registry.register_metric("cpu_idle", &cpu_idle);
- IntCounter put_requests_total(MetricUnit::NOUNIT);
put_requests_total.increment(2345);
- registry.register_metric("requests_total",
- MetricLabels().add("type", "put").add("path", "/sports"),
- &put_requests_total);
+
s_expect_response =
"# TYPE test_cpu_idle gauge\n"
"test_cpu_idle 50\n"
"# TYPE test_requests_total counter\n"
"test_requests_total{path=\"/sports\",type=\"put\"} 2345\n";
HttpRequest request(_evhttp_req);
- MetricsAction action(®istry);
+ MetricsAction action(&metric_registry);
action.handle(&request);
}
TEST_F(MetricsActionTest, prometheus_no_prefix) {
- MetricRegistry registry("");
- IntGauge cpu_idle(MetricUnit::PERCENT);
+ MetricRegistry metric_registry("");
+ MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_no_prefix", {});
+
+ IntGauge cpu_idle;
+ DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT);
+ METRIC_REGISTER(entity, cpu_idle);
+
cpu_idle.set_value(50);
- registry.register_metric("cpu_idle", &cpu_idle);
+
s_expect_response =
"# TYPE cpu_idle gauge\n"
"cpu_idle 50\n";
HttpRequest request(_evhttp_req);
- MetricsAction action(®istry);
- action.handle(&request);
-}
-
-TEST_F(MetricsActionTest, prometheus_no_name) {
- MetricRegistry registry("test");
- IntGauge cpu_idle(MetricUnit::PERCENT);
- cpu_idle.set_value(50);
- registry.register_metric("", &cpu_idle);
- s_expect_response = "";
- HttpRequest request(_evhttp_req);
- MetricsAction action(®istry);
+ MetricsAction action(&metric_registry);
action.handle(&request);
}
diff --git a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
index dc9388d..4016917 100644
--- a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
+++ b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
@@ -64,43 +64,36 @@ TEST_F(UniqueRowsetIdGeneratorTest, RowsetIdFormatTest) {
}
}
-
TEST_F(UniqueRowsetIdGeneratorTest, GenerateIdTest) {
UniqueId backend_uid = UniqueId::gen_uid();
- UniqueId backend_uid2 = UniqueId::gen_uid();
- ASSERT_NE(backend_uid, backend_uid2);
UniqueRowsetIdGenerator id_generator(backend_uid);
- UniqueRowsetIdGenerator id_generator2(backend_uid2);
- {
- RowsetId rowset_id1 = id_generator.next_id(); // hi == 1
- RowsetId rowset_id2 = id_generator2.next_id();
- ASSERT_EQ(rowset_id1.hi, rowset_id2.hi);
- }
- {
- int64_t hi = 2; // version
- hi <<= 56;
- RowsetId rowset_id = id_generator.next_id(); // hi == 2
- ASSERT_EQ(rowset_id.hi, hi + 2);
- ASSERT_EQ(rowset_id.version, 2);
- ASSERT_EQ(backend_uid.lo, rowset_id.lo);
- ASSERT_EQ(backend_uid.hi, rowset_id.mi);
- ASSERT_NE(rowset_id.hi, 0);
- bool in_use = id_generator.id_in_use(rowset_id);
- ASSERT_TRUE(in_use);
- id_generator.release_id(rowset_id);
- in_use = id_generator.id_in_use(rowset_id);
- ASSERT_FALSE(in_use);
-
- int64_t high = rowset_id.hi + 1;
- rowset_id = id_generator.next_id(); // hi == 3
- ASSERT_EQ(rowset_id.hi, high);
- in_use = id_generator.id_in_use(rowset_id);
- ASSERT_TRUE(in_use);
-
- std::string rowset_mid_str = rowset_id.to_string().substr(16,16);
- std::string backend_mid_str = backend_uid.to_string().substr(0, 16);
- ASSERT_EQ(rowset_mid_str, backend_mid_str);
- }
+ int64_t hi = 2; // version
+ hi <<= 56;
+
+ RowsetId rowset_id = id_generator.next_id(); // hi == 1
+ ASSERT_EQ(rowset_id.hi, hi + 1);
+ ASSERT_EQ(rowset_id.version, 2);
+ rowset_id = id_generator.next_id(); // hi == 2
+ ASSERT_EQ(rowset_id.hi, hi + 2);
+ ASSERT_EQ(rowset_id.version, 2);
+ ASSERT_EQ(backend_uid.lo, rowset_id.lo);
+ ASSERT_EQ(backend_uid.hi, rowset_id.mi);
+ ASSERT_NE(rowset_id.hi, 0);
+ bool in_use = id_generator.id_in_use(rowset_id);
+ ASSERT_TRUE(in_use);
+ id_generator.release_id(rowset_id);
+ in_use = id_generator.id_in_use(rowset_id);
+ ASSERT_FALSE(in_use);
+
+ int64_t high = rowset_id.hi + 1;
+ rowset_id = id_generator.next_id(); // hi == 3
+ ASSERT_EQ(rowset_id.hi, high);
+ in_use = id_generator.id_in_use(rowset_id);
+ ASSERT_TRUE(in_use);
+
+ std::string rowset_mid_str = rowset_id.to_string().substr(16,16);
+ std::string backend_mid_str = backend_uid.to_string().substr(0, 16);
+ ASSERT_EQ(rowset_mid_str, backend_mid_str);
}
TEST_F(UniqueRowsetIdGeneratorTest, GenerateIdBenchmark) {
diff --git a/be/test/runtime/tmp_file_mgr_test.cpp b/be/test/runtime/tmp_file_mgr_test.cpp
index 7e54788..62fec4d 100644
--- a/be/test/runtime/tmp_file_mgr_test.cpp
+++ b/be/test/runtime/tmp_file_mgr_test.cpp
@@ -39,38 +39,21 @@ namespace doris {
class TmpFileMgrTest : public ::testing::Test {
protected:
- virtual void SetUp() {
- _metrics.reset(new MetricRegistry(""));
- }
- virtual void TearDown() {
- _metrics.reset();
- }
-#if 0
// Check that metric values are consistent with TmpFileMgr state.
void check_metrics(TmpFileMgr* tmp_file_mgr) {
vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->active_tmp_devices();
- IntCounter* active_metric = _metrics->get_metric<IntCounter>(
- "tmp-file-mgr.active-scratch-dirs");
- EXPECT_EQ(active.size(), active_metric->value());
- SetMetric<string>* active_set_metric = _metrics->get_metric<SetMetric<string> >(
- "tmp-file-mgr.active-scratch-dirs.list");
- set<string> active_set = active_set_metric->value();
- EXPECT_EQ(active.size(), active_set.size());
- for (int i = 0; i < active.size(); ++i) {
- string tmp_dir_path = tmp_file_mgr->get_tmp_dir_path(active[i]);
- EXPECT_TRUE(active_set.find(tmp_dir_path) != active_set.end());
- }
+ int64_t active_metric =
+ DorisMetrics::instance()->metric_registry()->get_entity("server")->get_metric("active_scratch_dirs").value();
+ EXPECT_EQ(active.size(), active_metric);
}
-#endif
- boost::scoped_ptr<MetricRegistry> _metrics;
};
// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks
// at the expected file offsets and expands the temporary file to the correct size.
TEST_F(TmpFileMgrTest, TestFileAllocation) {
TmpFileMgr tmp_file_mgr;
- EXPECT_TRUE(tmp_file_mgr.init(_metrics.get()).ok());
+ EXPECT_TRUE(tmp_file_mgr.init().ok());
// Default configuration should give us one temporary device.
EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices();
@@ -110,7 +93,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
EXPECT_TRUE(FileSystemUtil::create_directory(tmp_dirs[i]).ok());
}
TmpFileMgr tmp_file_mgr;
- tmp_file_mgr.init_custom(tmp_dirs, true, _metrics.get());
+ tmp_file_mgr.init_custom(tmp_dirs, true);
// Only the first directory should be used.
EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
@@ -134,7 +117,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
EXPECT_TRUE(FileSystemUtil::create_directory(tmp_dirs[i]).ok());
}
TmpFileMgr tmp_file_mgr;
- tmp_file_mgr.init_custom(tmp_dirs, false, _metrics.get());
+ tmp_file_mgr.init_custom(tmp_dirs, false);
// Both directories should be used.
EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
@@ -162,7 +145,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
EXPECT_TRUE(FileSystemUtil::create_directory(tmp_dirs[i]).ok());
}
TmpFileMgr tmp_file_mgr;
- tmp_file_mgr.init_custom(tmp_dirs, false, _metrics.get());
+ tmp_file_mgr.init_custom(tmp_dirs, false);
// Both directories should be used.
vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
@@ -208,7 +191,7 @@ TEST_F(TmpFileMgrTest, TestAllocateFails) {
vector<string> tmp_dirs(1, tmp_dir);
EXPECT_TRUE(FileSystemUtil::create_directory(tmp_dir).ok());
TmpFileMgr tmp_file_mgr;
- tmp_file_mgr.init_custom(tmp_dirs, false, _metrics.get());
+ tmp_file_mgr.init_custom(tmp_dirs, false);
TUniqueId id;
TmpFileMgr::File* allocated_file1;
diff --git a/be/test/util/doris_metrics_test.cpp b/be/test/util/doris_metrics_test.cpp
index cf728b4..711eceb 100644
--- a/be/test/util/doris_metrics_test.cpp
+++ b/be/test/util/doris_metrics_test.cpp
@@ -30,223 +30,154 @@ public:
}
};
-class TestMetricsVisitor : public MetricsVisitor {
-public:
- virtual ~TestMetricsVisitor() { }
- void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) {
- for (auto& it : collector->metrics()) {
- Metric* metric = it.second;
- auto& labels = it.first;
- switch (metric->type()) {
- case MetricType::COUNTER: {
- bool has_prev = false;
- if (!prefix.empty()) {
- _ss << prefix;
- has_prev = true;
- }
- if (!name.empty()) {
- if (has_prev) {
- _ss << "_";
- }
- _ss << name;
- }
- if (!labels.empty()) {
- if (has_prev) {
- _ss << "{";
- }
- _ss << labels.to_string();
- if (has_prev) {
- _ss << "}";
- }
- }
- _ss << " " << metric->to_string() << std::endl;
- break;
- }
- default:
- break;
- }
- }
- }
- std::string to_string() {
- return _ss.str();
- }
-private:
- std::stringstream _ss;
-};
-
TEST_F(DorisMetricsTest, Normal) {
- TestMetricsVisitor visitor;
- auto metrics = DorisMetrics::instance()->metrics();
- metrics->collect(&visitor);
- LOG(INFO) << "\n" << visitor.to_string();
+ auto metric_registry = DorisMetrics::instance()->metric_registry();
+ auto server_entity = metric_registry->get_entity("server");
// check metric
{
DorisMetrics::instance()->fragment_requests_total.increment(12);
- auto metric = metrics->get_metric("fragment_requests_total");
+ auto metric = server_entity->get_metric("fragment_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("12", metric->to_string().c_str());
}
{
DorisMetrics::instance()->fragment_request_duration_us.increment(101);
- auto metric = metrics->get_metric("fragment_request_duration_us");
+ auto metric = server_entity->get_metric("fragment_request_duration_us");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("101", metric->to_string().c_str());
}
{
DorisMetrics::instance()->http_requests_total.increment(102);
- auto metric = metrics->get_metric("http_requests_total");
+ auto metric = server_entity->get_metric("http_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("102", metric->to_string().c_str());
}
{
DorisMetrics::instance()->http_request_send_bytes.increment(104);
- auto metric = metrics->get_metric("http_request_send_bytes");
+ auto metric = server_entity->get_metric("http_request_send_bytes");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("104", metric->to_string().c_str());
}
{
DorisMetrics::instance()->query_scan_bytes.increment(104);
- auto metric = metrics->get_metric("query_scan_bytes");
+ auto metric = server_entity->get_metric("query_scan_bytes");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("104", metric->to_string().c_str());
}
{
DorisMetrics::instance()->query_scan_rows.increment(105);
- auto metric = metrics->get_metric("query_scan_rows");
+ auto metric = server_entity->get_metric("query_scan_rows");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("105", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_requests_success_total.increment(106);
- auto metric = metrics->get_metric("push_requests_total",
- MetricLabels().add("status", "SUCCESS"));
+ auto metric = server_entity->get_metric("push_requests_success_total", "push_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("106", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_requests_fail_total.increment(107);
- auto metric = metrics->get_metric("push_requests_total",
- MetricLabels().add("status", "FAIL"));
+ auto metric = server_entity->get_metric("push_requests_fail_total", "push_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("107", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_duration_us.increment(108);
- auto metric = metrics->get_metric("push_request_duration_us");
+ auto metric = server_entity->get_metric("push_request_duration_us");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("108", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_write_bytes.increment(109);
- auto metric = metrics->get_metric("push_request_write_bytes");
+ auto metric = server_entity->get_metric("push_request_write_bytes");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("109", metric->to_string().c_str());
}
{
DorisMetrics::instance()->push_request_write_rows.increment(110);
- auto metric = metrics->get_metric("push_request_write_rows");
+ auto metric = server_entity->get_metric("push_request_write_rows");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("110", metric->to_string().c_str());
}
// engine request
{
DorisMetrics::instance()->create_tablet_requests_total.increment(15);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "create_tablet")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("create_tablet_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("15", metric->to_string().c_str());
}
{
DorisMetrics::instance()->drop_tablet_requests_total.increment(16);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "drop_tablet")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("drop_tablet_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("16", metric->to_string().c_str());
}
{
DorisMetrics::instance()->report_all_tablets_requests_total.increment(17);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "report_all_tablets")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("report_all_tablets_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("17", metric->to_string().c_str());
}
{
DorisMetrics::instance()->report_tablet_requests_total.increment(18);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "report_tablet")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("report_tablet_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("18", metric->to_string().c_str());
}
{
DorisMetrics::instance()->schema_change_requests_total.increment(19);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "schema_change")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("schema_change_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("19", metric->to_string().c_str());
}
{
DorisMetrics::instance()->create_rollup_requests_total.increment(20);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "create_rollup")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("create_rollup_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("20", metric->to_string().c_str());
}
{
DorisMetrics::instance()->storage_migrate_requests_total.increment(21);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "storage_migrate")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("storage_migrate_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("21", metric->to_string().c_str());
}
{
DorisMetrics::instance()->delete_requests_total.increment(22);
- auto metric = metrics->get_metric("engine_requests_total",
- MetricLabels().add("type", "delete")
- .add("status", "total"));
+ auto metric = server_entity->get_metric("delete_requests_total", "engine_requests_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("22", metric->to_string().c_str());
}
// comapction
{
DorisMetrics::instance()->base_compaction_deltas_total.increment(30);
- auto metric = metrics->get_metric("compaction_deltas_total",
- MetricLabels().add("type", "base"));
+ auto metric = server_entity->get_metric("base_compaction_deltas_total", "compaction_deltas_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("30", metric->to_string().c_str());
}
{
DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31);
- auto metric = metrics->get_metric("compaction_deltas_total",
- MetricLabels().add("type", "cumulative"));
+ auto metric = server_entity->get_metric("cumulative_compaction_deltas_total", "compaction_deltas_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("31", metric->to_string().c_str());
}
{
DorisMetrics::instance()->base_compaction_bytes_total.increment(32);
- auto metric = metrics->get_metric("compaction_bytes_total",
- MetricLabels().add("type", "base"));
+ auto metric = server_entity->get_metric("base_compaction_bytes_total", "compaction_bytes_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("32", metric->to_string().c_str());
}
{
DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33);
- auto metric = metrics->get_metric("compaction_bytes_total",
- MetricLabels().add("type", "cumulative"));
+ auto metric = server_entity->get_metric("cumulative_compaction_bytes_total", "compaction_bytes_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("33", metric->to_string().c_str());
}
// Gauge
{
DorisMetrics::instance()->memory_pool_bytes_total.increment(40);
- auto metric = metrics->get_metric("memory_pool_bytes_total");
+ auto metric = server_entity->get_metric("memory_pool_bytes_total");
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("40", metric->to_string().c_str());
}
diff --git a/be/test/util/new_metrics_test.cpp b/be/test/util/new_metrics_test.cpp
index 0dd94e9..67b7aea 100644
--- a/be/test/util/new_metrics_test.cpp
+++ b/be/test/util/new_metrics_test.cpp
@@ -36,7 +36,7 @@ public:
TEST_F(MetricsTest, Counter) {
{
- IntCounter counter(MetricUnit::NOUNIT);
+ IntCounter counter;
ASSERT_EQ(0, counter.value());
counter.increment(100);
ASSERT_EQ(100, counter.value());
@@ -44,7 +44,7 @@ TEST_F(MetricsTest, Counter) {
ASSERT_STREQ("100", counter.to_string().c_str());
}
{
- IntAtomicCounter counter(MetricUnit::NOUNIT);
+ IntAtomicCounter counter;
ASSERT_EQ(0, counter.value());
counter.increment(100);
ASSERT_EQ(100, counter.value());
@@ -52,7 +52,7 @@ TEST_F(MetricsTest, Counter) {
ASSERT_STREQ("100", counter.to_string().c_str());
}
{
- UIntCounter counter(MetricUnit::NOUNIT);
+ UIntCounter counter;
ASSERT_EQ(0, counter.value());
counter.increment(100);
ASSERT_EQ(100, counter.value());
@@ -60,8 +60,8 @@ TEST_F(MetricsTest, Counter) {
ASSERT_STREQ("100", counter.to_string().c_str());
}
{
- DoubleCounter counter(MetricUnit::NOUNIT);
- ASSERT_EQ(0.0, counter.value());
+ DoubleCounter counter;
+ ASSERT_EQ(0, counter.value());
counter.increment(1.23);
ASSERT_EQ(1.23, counter.value());
@@ -98,7 +98,7 @@ TEST_F(MetricsTest, CounterPerf) {
}
// IntAtomicCounter
{
- IntAtomicCounter counter(MetricUnit::NOUNIT);
+ IntAtomicCounter counter;
MonotonicStopWatch watch;
watch.start();
for (int i = 0; i < kLoopCount; ++i) {
@@ -111,7 +111,7 @@ TEST_F(MetricsTest, CounterPerf) {
}
// IntCounter
{
- IntCounter counter(MetricUnit::NOUNIT);
+ IntCounter counter;
MonotonicStopWatch watch;
watch.start();
for (int i = 0; i < kLoopCount; ++i) {
@@ -125,7 +125,7 @@ TEST_F(MetricsTest, CounterPerf) {
// multi-thread for IntCounter
{
- IntCounter mt_counter(MetricUnit::NOUNIT);
+ IntCounter mt_counter;
std::vector<std::thread> updaters;
std::atomic<uint64_t> used_time(0);
for (int i = 0; i < 8; ++i) {
@@ -140,7 +140,7 @@ TEST_F(MetricsTest, CounterPerf) {
}
// multi-thread for IntAtomicCounter
{
- IntAtomicCounter mt_counter(MetricUnit::NOUNIT);
+ IntAtomicCounter mt_counter;
std::vector<std::thread> updaters;
std::atomic<uint64_t> used_time(0);
for (int i = 0; i < 8; ++i) {
@@ -158,7 +158,7 @@ TEST_F(MetricsTest, CounterPerf) {
TEST_F(MetricsTest, Gauge) {
// IntGauge
{
- IntGauge gauge(MetricUnit::NOUNIT);
+ IntGauge gauge;
ASSERT_EQ(0, gauge.value());
gauge.set_value(100);
ASSERT_EQ(100, gauge.value());
@@ -167,7 +167,7 @@ TEST_F(MetricsTest, Gauge) {
}
// UIntGauge
{
- UIntGauge gauge(MetricUnit::NOUNIT);
+ UIntGauge gauge;
ASSERT_EQ(0, gauge.value());
gauge.set_value(100);
ASSERT_EQ(100, gauge.value());
@@ -176,7 +176,7 @@ TEST_F(MetricsTest, Gauge) {
}
// DoubleGauge
{
- DoubleGauge gauge(MetricUnit::NOUNIT);
+ DoubleGauge gauge;
ASSERT_EQ(0.0, gauge.value());
gauge.set_value(1.23);
ASSERT_EQ(1.23, gauge.value());
@@ -185,170 +185,212 @@ TEST_F(MetricsTest, Gauge) {
}
}
-TEST_F(MetricsTest, MetricLabel) {
- std::string put("put");
- MetricLabel label("type", put);
+TEST_F(MetricsTest, MetricPrototype) {
+ {
+ MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "fragment_requests_total",
+ "Total fragment requests received.");
- ASSERT_TRUE(label == MetricLabel("type", "put"));
- ASSERT_TRUE(label != MetricLabel("type", "get"));
- ASSERT_TRUE(label < MetricLabel("type", "quit"));
- ASSERT_TRUE(label < MetricLabel("typee", "put"));
- ASSERT_TRUE(label.compare(MetricLabel("type", "put")) == 0);
- ASSERT_TRUE(label.compare(MetricLabel("typee", "put")) < 0);
+ ASSERT_EQ("fragment_requests_total", cpu_idle_type.simple_name());
+ ASSERT_EQ("fragment_requests_total", cpu_idle_type.combine_name(""));
+ ASSERT_EQ("doris_be_fragment_requests_total", cpu_idle_type.combine_name("doris_be"));
+ }
+ {
+ MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle",
+ "CPU's idle time percent", "cpu");
- ASSERT_STREQ("type=put", label.to_string().c_str());
+ ASSERT_EQ("cpu", cpu_idle_type.simple_name());
+ ASSERT_EQ("cpu", cpu_idle_type.combine_name(""));
+ ASSERT_EQ("doris_be_cpu", cpu_idle_type.combine_name("doris_be"));
+ }
}
-TEST_F(MetricsTest, MetricLabels) {
- MetricLabels empty_labels;
+TEST_F(MetricsTest, MetricEntityWithMetric) {
+ MetricEntity entity("test_entity", {});
- ASSERT_TRUE(empty_labels == MetricLabels());
- ASSERT_TRUE(empty_labels < MetricLabels().add("type", "put"));
- ASSERT_TRUE(empty_labels.empty());
+ IntCounter cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle");
- ASSERT_STREQ("", empty_labels.to_string().c_str());
+ // Before register
+ Metric* metric = entity.get_metric("cpu_idle");
+ ASSERT_EQ(nullptr, metric);
- MetricLabels labels;
- labels.add("path", "/home").add("type", "put");
+ // Register
+ entity.register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(12);
+
+ metric = entity.get_metric("cpu_idle");
+ ASSERT_NE(nullptr, metric);
+ ASSERT_EQ("12", metric->to_string());
- ASSERT_TRUE(labels == MetricLabels().add("path", "/home").add("type", "put"));
- ASSERT_FALSE(labels == MetricLabels().add("path", "/home").add("type", "get"));
- ASSERT_FALSE(labels == MetricLabels().add("path", "/home"));
- ASSERT_TRUE(labels < MetricLabels().add("path", "/sports"));
- ASSERT_TRUE(labels < MetricLabels().add("path", "/home").add("type", "put").add("xstatus", "404"));
- ASSERT_FALSE(labels < MetricLabels().add("path", "/abc"));
- ASSERT_FALSE(labels < MetricLabels().add("path", "/home").add("type", "put"));
+ cpu_idle.increment(8);
+ ASSERT_EQ("20", metric->to_string());
- ASSERT_STREQ("path=/home,type=put", labels.to_string().c_str());
+ // Deregister
+ entity.deregister_metric(&cpu_idle_type);
+
+ // After deregister
+ metric = entity.get_metric("cpu_idle");
+ ASSERT_EQ(nullptr, metric);
}
-class TestMetricsVisitor : public MetricsVisitor {
-public:
- virtual ~TestMetricsVisitor() { }
- void visit(const std::string& prefix, const std::string& name,
- MetricCollector* collector) {
- for (auto& it : collector->metrics()) {
- Metric* metric = it.second;
- auto& labels = it.first;
- switch (metric->type()) {
- case MetricType::COUNTER: {
- bool has_prev = false;
- if (!prefix.empty()) {
- _ss << prefix;
- has_prev = true;
- }
- if (!name.empty()) {
- if (has_prev) {
- _ss << "_";
- }
- _ss << name;
- }
- if (!labels.empty()) {
- if (has_prev) {
- _ss << "_";
- }
- _ss << labels.to_string();
- }
- _ss << " " << metric->to_string() << std::endl;
- break;
- }
- default:
- break;
- }
- }
- }
- std::string to_string() {
- return _ss.str();
- }
-private:
- std::stringstream _ss;
-};
+TEST_F(MetricsTest, MetricEntityWithHook) {
+ MetricEntity entity("test_entity", {});
-TEST_F(MetricsTest, MetricCollector) {
- IntCounter puts(MetricUnit::NOUNIT);
- puts.increment(101);
- IntCounter gets(MetricUnit::NOUNIT);
- gets.increment(201);
- MetricCollector collector;
- ASSERT_TRUE(collector.add_metic(MetricLabels().add("type", "put"), &puts));
- ASSERT_TRUE(collector.add_metic(MetricLabels().add("type", "get"), &gets));
- ASSERT_FALSE(collector.add_metic(MetricLabels().add("type", "get"), &gets));
+ IntCounter cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle");
- {
- // Can't add different type to one collector
- IntGauge post(MetricUnit::NOUNIT);
- ASSERT_FALSE(collector.add_metic(MetricLabels().add("type", "post"), &post));
- }
+ // Register
+ entity.register_metric(&cpu_idle_type, &cpu_idle);
+ entity.register_hook("test_hook", [&cpu_idle]() {
+ cpu_idle.increment(6);
+ });
+
+ // Before hook
+ Metric* metric = entity.get_metric("cpu_idle");
+ ASSERT_NE(nullptr, metric);
+ ASSERT_EQ("0", metric->to_string());
+
+ // Hook
+ entity.trigger_hook_unlocked(true);
+ ASSERT_EQ("6", metric->to_string());
+
+ entity.trigger_hook_unlocked(true);
+ ASSERT_EQ("12", metric->to_string());
+
+ // Deregister hook
+ entity.deregister_hook("test_hook");
+ // Hook but no effect
+ entity.trigger_hook_unlocked(true);
+ ASSERT_EQ("12", metric->to_string());
+}
+
+TEST_F(MetricsTest, MetricRegistryRegister) {
+ MetricRegistry registry("test_registry");
+
+ // No entity
+ ASSERT_EQ("", registry.to_prometheus());
+ ASSERT_EQ("[]", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
+
+ // Before register
+ auto entity = registry.get_entity("test_entity").get();
+ ASSERT_EQ(nullptr, entity);
+
+ // Register
+ entity = registry.register_entity("test_entity", {});
+ ASSERT_NE(nullptr, entity);
+
+ // After register
+ auto entity1 = registry.get_entity("test_entity").get();
+ ASSERT_NE(nullptr, entity1);
+ ASSERT_EQ(entity, entity1);
+
+ registry.deregister_entity("test_entity");
+ entity = registry.get_entity("test_entity").get();
+ ASSERT_EQ(nullptr, entity);
+}
+
+TEST_F(MetricsTest, MetricRegistryOutput) {
+ MetricRegistry registry("test_registry");
{
- TestMetricsVisitor visitor;
- collector.collect("", "", &visitor);
- ASSERT_STREQ("type=get 201\ntype=put 101\n", visitor.to_string().c_str());
+ // No entity
+ ASSERT_EQ("", registry.to_prometheus());
+ ASSERT_EQ("[]", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
}
- collector.remove_metric(&puts);
+
{
- TestMetricsVisitor visitor;
- collector.collect("", "", &visitor);
- ASSERT_STREQ("type=get 201\n", visitor.to_string().c_str());
+ // Register one common metric to the entity
+ auto entity = registry.register_entity("test_entity", {});
+
+ IntGauge cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "", {}, true);
+ entity->register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(8);
+
+ ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge
+test_registry_cpu_idle 8
+)", registry.to_prometheus());
+ ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle"},"unit":"percent","value":8}])", registry.to_json());
+ ASSERT_EQ("test_registry_cpu_idle LONG 8\n", registry.to_core_string());
+ registry.deregister_entity("test_entity");
}
- // test get_metric
- ASSERT_TRUE(collector.get_metric(MetricLabels()) == nullptr);
- ASSERT_TRUE(collector.get_metric(MetricLabels().add("type" ,"get")) != nullptr);
- std::vector<Metric*> metrics;
- collector.get_metrics(&metrics);
- ASSERT_EQ(1, metrics.size());
-}
-TEST_F(MetricsTest, MetricRegistry) {
- MetricRegistry registry("test");
- IntCounter cpu_idle(MetricUnit::PERCENT);
- cpu_idle.increment(12);
- ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
- // registry failed
- IntCounter dummy(MetricUnit::PERCENT);
- ASSERT_FALSE(registry.register_metric("cpu_idle", &dummy));
- IntCounter memory_usage(MetricUnit::BYTES);
- memory_usage.increment(24);
- ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
{
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- ASSERT_STREQ("test_cpu_idle 12\ntest_memory_usage 24\n", visitor.to_string().c_str());
+ // Register one metric with group name to the entity
+ auto entity = registry.register_entity("test_entity", {});
+
+ IntGauge cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}, false);
+ entity->register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(18);
+
+ ASSERT_EQ(R"(# TYPE test_registry_cpu gauge
+test_registry_cpu{mode="idle"} 18
+)", registry.to_prometheus());
+ ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":18}])", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
+ registry.deregister_entity("test_entity");
}
- registry.deregister_metric(&memory_usage);
+
{
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- ASSERT_STREQ("test_cpu_idle 12\n", visitor.to_string().c_str());
+ // Register one common metric to an entity with label
+ auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}});
+
+ IntGauge cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle");
+ entity->register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(28);
+
+ ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge
+test_registry_cpu_idle{name="lable_test"} 28
+)", registry.to_prometheus());
+ ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle","name":"lable_test"},"unit":"percent","value":28}])", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
+ registry.deregister_entity("test_entity");
}
- // test get_metric
- ASSERT_TRUE(registry.get_metric("cpu_idle") != nullptr);
- ASSERT_TRUE(registry.get_metric("memory_usage") == nullptr);
-}
-
-TEST_F(MetricsTest, MetricRegistry2) {
- MetricRegistry registry("test");
- IntCounter cpu_idle(MetricUnit::PERCENT);
- cpu_idle.increment(12);
- ASSERT_TRUE(registry.register_metric("cpu_idle", &cpu_idle));
{
- // memory_usage will deregister after this block
- IntCounter memory_usage(MetricUnit::BYTES);
- memory_usage.increment(24);
- ASSERT_TRUE(registry.register_metric("memory_usage", &memory_usage));
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- ASSERT_STREQ("test_cpu_idle 12\ntest_memory_usage 24\n", visitor.to_string().c_str());
+ // Register one common metric with group name to an entity with label
+ auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}});
+
+ IntGauge cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}});
+ entity->register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(38);
+
+ ASSERT_EQ(R"(# TYPE test_registry_cpu gauge
+test_registry_cpu{name="lable_test",mode="idle"} 38
+)", registry.to_prometheus());
+ ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle","name":"lable_test"},"unit":"percent","value":38}])", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
+ registry.deregister_entity("test_entity");
}
{
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- ASSERT_STREQ("test_cpu_idle 12\n", visitor.to_string().c_str());
+ // Register two common metrics to one entity
+ auto entity = registry.register_entity("test_entity", {});
+
+ IntGauge cpu_idle;
+ MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}});
+ entity->register_metric(&cpu_idle_type, &cpu_idle);
+ cpu_idle.increment(48);
+
+ IntGauge cpu_guest;
+ MetricPrototype cpu_guest_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_guest", "", "cpu", {{"mode", "guest"}});
+ entity->register_metric(&cpu_guest_type, &cpu_guest);
+ cpu_guest.increment(58);
+
+ ASSERT_EQ(R"(# TYPE test_registry_cpu gauge
+test_registry_cpu{mode="idle"} 48
+test_registry_cpu{mode="guest"} 58
+)", registry.to_prometheus());
+ ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"guest"},"unit":"percent","value":58},{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":48}])", registry.to_json());
+ ASSERT_EQ("", registry.to_core_string());
+ registry.deregister_entity("test_entity");
}
}
-
}
int main(int argc, char** argv) {
diff --git a/be/test/util/system_metrics_test.cpp b/be/test/util/system_metrics_test.cpp
index 7f36228..a3e8cbb 100644
--- a/be/test/util/system_metrics_test.cpp
+++ b/be/test/util/system_metrics_test.cpp
@@ -33,50 +33,6 @@ public:
virtual ~SystemMetricsTest() {}
};
-class TestMetricsVisitor : public MetricsVisitor {
-public:
- virtual ~TestMetricsVisitor() {}
- void visit(const std::string& prefix, const std::string& name, MetricCollector* collector) {
- for (auto& it : collector->metrics()) {
- Metric* metric = it.second;
- auto& labels = it.first;
- switch (metric->type()) {
- case MetricType::GAUGE:
- case MetricType::COUNTER: {
- bool has_prev = false;
- if (!prefix.empty()) {
- _ss << prefix;
- has_prev = true;
- }
- if (!name.empty()) {
- if (has_prev) {
- _ss << "_";
- }
- _ss << name;
- }
- if (!labels.empty()) {
- if (has_prev) {
- _ss << "{";
- }
- _ss << labels.to_string();
- if (has_prev) {
- _ss << "}";
- }
- }
- _ss << " " << metric->to_string() << std::endl;
- break;
- }
- default:
- break;
- }
- }
- }
- std::string to_string() { return _ss.str(); }
-
-private:
- std::stringstream _ss;
-};
-
extern const char* k_ut_stat_path;
extern const char* k_ut_diskstats_path;
extern const char* k_ut_net_dev_path;
@@ -110,127 +66,109 @@ TEST_F(SystemMetricsTest, normal) {
disk_devices.emplace("sda");
std::vector<std::string> network_interfaces;
network_interfaces.emplace_back("xgbe0");
- SystemMetrics metrics;
- metrics.install(®istry, disk_devices, network_interfaces);
- metrics.update();
+ SystemMetrics metrics(®istry, disk_devices, network_interfaces);
+ auto entity = registry.get_entity("server");
+ ASSERT_TRUE(entity != nullptr);
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- LOG(INFO) << "\n" << visitor.to_string();
+ metrics.update();
// cpu
- Metric* cpu_user = registry.get_metric("cpu", MetricLabels().add("mode", "user"));
+ Metric* cpu_user = entity->get_metric("cpu_user", "cpu");
ASSERT_TRUE(cpu_user != nullptr);
// ASSERT_STREQ("57199151", cpu_user->to_string().c_str());
- Metric* cpu_nice = registry.get_metric("cpu", MetricLabels().add("mode", "nice"));
+ Metric* cpu_nice = entity->get_metric("cpu_nice", "cpu");
ASSERT_TRUE(cpu_nice != nullptr);
ASSERT_STREQ("2616310", cpu_nice->to_string().c_str());
- Metric* cpu_system = registry.get_metric("cpu", MetricLabels().add("mode", "system"));
+ Metric* cpu_system = entity->get_metric("cpu_system", "cpu");
ASSERT_TRUE(cpu_system != nullptr);
ASSERT_STREQ("10600935", cpu_system->to_string().c_str());
- Metric* cpu_idle = registry.get_metric("cpu", MetricLabels().add("mode", "idle"));
+ Metric* cpu_idle = entity->get_metric("cpu_idle", "cpu");
ASSERT_TRUE(cpu_idle != nullptr);
ASSERT_STREQ("1517505423", cpu_idle->to_string().c_str());
- Metric* cpu_iowait = registry.get_metric("cpu", MetricLabels().add("mode", "iowait"));
+ Metric* cpu_iowait = entity->get_metric("cpu_iowait", "cpu");
ASSERT_TRUE(cpu_iowait != nullptr);
ASSERT_STREQ("2137148", cpu_iowait->to_string().c_str());
- Metric* cpu_irq = registry.get_metric("cpu", MetricLabels().add("mode", "irq"));
+ Metric* cpu_irq = entity->get_metric("cpu_irq", "cpu");
ASSERT_TRUE(cpu_irq != nullptr);
ASSERT_STREQ("0", cpu_irq->to_string().c_str());
- Metric* cpu_softirq = registry.get_metric("cpu", MetricLabels().add("mode", "soft_irq"));
+ Metric* cpu_softirq = entity->get_metric("cpu_soft_irq", "cpu");
ASSERT_TRUE(cpu_softirq != nullptr);
ASSERT_STREQ("108277", cpu_softirq->to_string().c_str());
- Metric* cpu_steal = registry.get_metric("cpu", MetricLabels().add("mode", "steal"));
+ Metric* cpu_steal = entity->get_metric("cpu_steal", "cpu");
ASSERT_TRUE(cpu_steal != nullptr);
ASSERT_STREQ("0", cpu_steal->to_string().c_str());
- Metric* cpu_guest = registry.get_metric("cpu", MetricLabels().add("mode", "guest"));
+ Metric* cpu_guest = entity->get_metric("cpu_guest", "cpu");
ASSERT_TRUE(cpu_guest != nullptr);
ASSERT_STREQ("0", cpu_guest->to_string().c_str());
+ Metric* cpu_guest_nice = entity->get_metric("cpu_guest_nice", "cpu");
+ ASSERT_TRUE(cpu_guest_nice != nullptr);
+ ASSERT_STREQ("0", cpu_guest_nice->to_string().c_str());
// memroy
- Metric* memory_allocated_bytes = registry.get_metric("memory_allocated_bytes");
+ Metric* memory_allocated_bytes = entity->get_metric("memory_allocated_bytes");
ASSERT_TRUE(memory_allocated_bytes != nullptr);
+
// network
- Metric* receive_bytes =
- registry.get_metric("network_receive_bytes", MetricLabels().add("device", "xgbe0"));
+ auto net_entity = registry.get_entity("network_metrics.xgbe0");
+ ASSERT_TRUE(net_entity != nullptr);
+
+ Metric* receive_bytes = net_entity->get_metric("network_receive_bytes");
ASSERT_TRUE(receive_bytes != nullptr);
ASSERT_STREQ("52567436039", receive_bytes->to_string().c_str());
- Metric* receive_packets = registry.get_metric("network_receive_packets",
- MetricLabels().add("device", "xgbe0"));
+ Metric* receive_packets = net_entity->get_metric("network_receive_packets");
ASSERT_TRUE(receive_packets != nullptr);
ASSERT_STREQ("65066152", receive_packets->to_string().c_str());
- Metric* send_bytes =
- registry.get_metric("network_send_bytes", MetricLabels().add("device", "xgbe0"));
+ Metric* send_bytes = net_entity->get_metric("network_send_bytes");
ASSERT_TRUE(send_bytes != nullptr);
ASSERT_STREQ("45480856156", send_bytes->to_string().c_str());
- Metric* send_packets =
- registry.get_metric("network_send_packets", MetricLabels().add("device", "xgbe0"));
+ Metric* send_packets = net_entity->get_metric("network_send_packets");
ASSERT_TRUE(send_packets != nullptr);
ASSERT_STREQ("88277614", send_packets->to_string().c_str());
+
// disk
- Metric* bytes_read =
- registry.get_metric("disk_bytes_read", MetricLabels().add("device", "sda"));
+ auto disk_entity = registry.get_entity("disk_metrics.sda");
+ ASSERT_TRUE(disk_entity != nullptr);
+ Metric* bytes_read = disk_entity->get_metric("disk_bytes_read");
ASSERT_TRUE(bytes_read != nullptr);
ASSERT_STREQ("20142745600", bytes_read->to_string().c_str());
- Metric* reads_completed =
- registry.get_metric("disk_reads_completed", MetricLabels().add("device", "sda"));
+ Metric* reads_completed = disk_entity->get_metric("disk_reads_completed");
ASSERT_TRUE(reads_completed != nullptr);
ASSERT_STREQ("759548", reads_completed->to_string().c_str());
- Metric* read_time_ms =
- registry.get_metric("disk_read_time_ms", MetricLabels().add("device", "sda"));
+ Metric* read_time_ms = disk_entity->get_metric("disk_read_time_ms");
ASSERT_TRUE(read_time_ms != nullptr);
ASSERT_STREQ("4308146", read_time_ms->to_string().c_str());
- Metric* bytes_written =
- registry.get_metric("disk_bytes_written", MetricLabels().add("device", "sda"));
+ Metric* bytes_written = disk_entity->get_metric("disk_bytes_written");
ASSERT_TRUE(bytes_written != nullptr);
ASSERT_STREQ("1624753500160", bytes_written->to_string().c_str());
- Metric* writes_completed =
- registry.get_metric("disk_writes_completed", MetricLabels().add("device", "sda"));
+ Metric* writes_completed = disk_entity->get_metric("disk_writes_completed");
ASSERT_TRUE(writes_completed != nullptr);
ASSERT_STREQ("18282936", writes_completed->to_string().c_str());
- Metric* write_time_ms =
- registry.get_metric("disk_write_time_ms", MetricLabels().add("device", "sda"));
+ Metric* write_time_ms = disk_entity->get_metric("disk_write_time_ms");
ASSERT_TRUE(write_time_ms != nullptr);
ASSERT_STREQ("1907755230", write_time_ms->to_string().c_str());
- Metric* io_time_ms =
- registry.get_metric("disk_io_time_ms", MetricLabels().add("device", "sda"));
+ Metric* io_time_ms = disk_entity->get_metric("disk_io_time_ms");
ASSERT_TRUE(io_time_ms != nullptr);
ASSERT_STREQ("19003350", io_time_ms->to_string().c_str());
- Metric* io_time_weigthed =
- registry.get_metric("disk_io_time_weigthed", MetricLabels().add("device", "sda"));
+ Metric* io_time_weigthed = disk_entity->get_metric("disk_io_time_weigthed");
ASSERT_TRUE(write_time_ms != nullptr);
ASSERT_STREQ("1912122964", io_time_weigthed->to_string().c_str());
// fd
- Metric* fd_metric = registry.get_metric("fd_num_limit");
+ Metric* fd_metric = entity->get_metric("fd_num_limit");
ASSERT_TRUE(fd_metric != nullptr);
ASSERT_STREQ("13052138", fd_metric->to_string().c_str());
- fd_metric = registry.get_metric("fd_num_used");
+ fd_metric = entity->get_metric("fd_num_used");
ASSERT_TRUE(fd_metric != nullptr);
ASSERT_STREQ("19520", fd_metric->to_string().c_str());
// net snmp
- Metric* tcp_retrans_segs =
- registry.get_metric("snmp", MetricLabels().add("name", "tcp_retrans_segs"));
+ Metric* tcp_retrans_segs = entity->get_metric("snmp_tcp_retrans_segs");
ASSERT_TRUE(tcp_retrans_segs != nullptr);
- Metric* tcp_in_errs =
- registry.get_metric("snmp", MetricLabels().add("name", "tcp_in_errs"));
+ Metric* tcp_in_errs = entity->get_metric("snmp_tcp_in_errs");
ASSERT_TRUE(tcp_in_errs != nullptr);
ASSERT_STREQ("826271", tcp_retrans_segs->to_string().c_str());
ASSERT_STREQ("12712", tcp_in_errs->to_string().c_str());
}
- {
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- ASSERT_TRUE(visitor.to_string().empty());
-
- Metric* cpu_idle = registry.get_metric("cpu", MetricLabels().add("mode", "idle"));
- ASSERT_TRUE(cpu_idle == nullptr);
- Metric* cpu_user = registry.get_metric("cpu", MetricLabels().add("mode", "user"));
- ASSERT_TRUE(cpu_user == nullptr);
- Metric* memory_allocated_bytes = registry.get_metric("memory_allocated_bytes");
- ASSERT_TRUE(memory_allocated_bytes == nullptr);
- }
}
TEST_F(SystemMetricsTest, no_proc_file) {
@@ -256,28 +194,28 @@ TEST_F(SystemMetricsTest, no_proc_file) {
disk_devices.emplace("sda");
std::vector<std::string> network_interfaces;
network_interfaces.emplace_back("xgbe0");
- SystemMetrics metrics;
- metrics.install(®istry, disk_devices, network_interfaces);
+ SystemMetrics metrics(®istry, disk_devices, network_interfaces);
- TestMetricsVisitor visitor;
- registry.collect(&visitor);
- LOG(INFO) << "\n" << visitor.to_string();
+ auto entity = registry.get_entity("server");
+ ASSERT_TRUE(entity != nullptr);
// cpu
- Metric* cpu_user = registry.get_metric("cpu", MetricLabels().add("mode", "user"));
+ Metric* cpu_user = entity->get_metric("cpu_user", "cpu");
ASSERT_TRUE(cpu_user != nullptr);
ASSERT_STREQ("0", cpu_user->to_string().c_str());
// memroy
- Metric* memory_allocated_bytes = registry.get_metric("memory_allocated_bytes");
+ Metric* memory_allocated_bytes = entity->get_metric("memory_allocated_bytes");
ASSERT_TRUE(memory_allocated_bytes != nullptr);
// network
- Metric* receive_bytes =
- registry.get_metric("network_receive_bytes", MetricLabels().add("device", "xgbe0"));
+ auto net_entity = registry.get_entity("network_metrics.xgbe0");
+ ASSERT_TRUE(net_entity != nullptr);
+ Metric* receive_bytes = net_entity->get_metric("network_receive_bytes");
ASSERT_TRUE(receive_bytes != nullptr);
ASSERT_STREQ("0", receive_bytes->to_string().c_str());
// disk
- Metric* bytes_read =
- registry.get_metric("disk_bytes_read", MetricLabels().add("device", "sda"));
+ auto disk_entity = registry.get_entity("disk_metrics.sda");
+ ASSERT_TRUE(disk_entity != nullptr);
+ Metric* bytes_read = disk_entity->get_metric("disk_bytes_read");
ASSERT_TRUE(bytes_read != nullptr);
ASSERT_STREQ("0", bytes_read->to_string().c_str());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org