You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/26 00:31:10 UTC
[doris] branch master updated: [Enhancement](metrics) add more metrics (#11693)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9691db7918 [Enhancement](metrics) add more metrics (#11693)
9691db7918 is described below
commit 9691db791807419e9cd017680e286c6fae65a68d
Author: ccoffline <45...@users.noreply.github.com>
AuthorDate: Wed Oct 26 08:31:03 2022 +0800
[Enhancement](metrics) add more metrics (#11693)
* Add `AutoMappedMetric` to measure dynamic object.
* Add query instance and rpc metrics
* Add thrift rpc metrics
* Add txn metrics
* Reorganize metrics init routine.
Co-authored-by: 迟成 <ch...@meituan.com>
---
.../maint-monitor/monitor-metrics/metrics.md | 13 +-
.../org/apache/doris/common/ThreadPoolManager.java | 53 +++++---
.../org/apache/doris/metric/AutoMappedMetric.java | 37 ++++++
.../java/org/apache/doris/metric/MetricRepo.java | 144 +++++++++++++++------
.../org/apache/doris/planner/OlapTableSink.java | 11 ++
.../java/org/apache/doris/qe/ConnectProcessor.java | 1 +
.../main/java/org/apache/doris/qe/Coordinator.java | 3 +
.../java/org/apache/doris/qe/QeProcessorImpl.java | 2 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 3 +
.../java/org/apache/doris/service/FeServer.java | 18 ++-
.../doris/transaction/DatabaseTransactionMgr.java | 39 +++++-
.../doris/transaction/GlobalTransactionMgr.java | 28 ++++
.../doris/transaction/PublishVersionDaemon.java | 5 +
.../apache/doris/transaction/TransactionState.java | 9 ++
.../apache/doris/common/ThreadPoolManagerTest.java | 7 -
15 files changed, 303 insertions(+), 70 deletions(-)
diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 12996ef240..206f021e2e 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -96,6 +96,7 @@ curl http://be_host:webserver_port/metrics?type=json
|`doris_fe_query_err`| | Num | 错误查询的累积值 | |
|`doris_fe_query_err_rate`| | Num/Sec| 每秒错误查询数 | 观察集群是否出现查询错误 | P0 |
|`doris_fe_query_latency_ms`| | 毫秒| 查询请求延迟的百分位统计。如 {quantile="0.75"} 表示 75 分位的查询延迟 | 详细观察各分位查询延迟 | P0 |
+|| | 毫秒| 各个DB的查询请求延迟的百分位统计。如 {quantile="0.75",db="test"} 表示DB test 75 分位的查询延迟 | 详细观察各DB各分位查询延迟 | P0 |
|`doris_fe_query_olap_table`| | Num| 查询内部表(OlapTable)的请求个数统计 | |
|`doris_fe_query_total`| | Num | 所有查询请求的累积计数 | |
|`doris_fe_report_queue_size`| | Num | BE的各种定期汇报任务在FE端的队列长度 | 该值反映了汇报任务在 Master FE 节点上的阻塞程度,数值越大,表示FE处理能力不足 | P0|
@@ -131,7 +132,17 @@ curl http://be_host:webserver_port/metrics?type=json
|| {type="reject"} | Num| 被拒绝的事务数量。(如当前运行事务数大于阈值,则新的事务会被拒绝)| |
|| {type="succes"} | Num| 成功的事务数量| |
|`doris_fe_txn_status`| | Num | 统计当前处于各个状态的导入事务的数量。如 {type="committed"} 表示处于 committed 状态的事务的数量 | 可以观测各个状态下导入事务的数量,来判断是否有堆积 | P0 |
-|`doris_fe_max_instances_num_per_user`|| Num| 当前连接用户中,发起fragment instance最多的用户的 instance 数目 |该数值可以用于观测当前是否有用户占用过多查询资源| P0 |
+|`doris_fe_query_instance_num`|| Num| 指定用户当前正在请求的fragment instance数目。如 {user="test_u"} 表示用户 test_u 当前正在请求的 instance 数目 |该数值可以用于观测指定用户是否占用过多查询资源| P0 |
+|`doris_fe_query_instance_begin`|| Num| 指定用户请求开始的fragment instance数目。如 {user="test_u"} 表示用户 test_u 开始请求的 instance 数目 |该数值可以用于观测指定用户是否提交了过多查询| P0 |
+|`doris_fe_query_rpc_total`|| Num| 发往指定BE的RPC次数。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC次数 |该数值可以观测是否向某个BE提交了过多RPC| |
+|`doris_fe_query_rpc_failed`|| Num| 发往指定BE的RPC失败次数。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC失败次数 |该数值可以观测某个BE是否存在RPC问题| |
+|`doris_fe_query_rpc_size`|| Num| 指定BE的RPC数据大小。如 {be="192.168.10.1"} 表示发往ip为 192.168.10.1 的BE的RPC数据字节数 |该数值可以观测是否向某个BE提交了过大的RPC| |
+|`doris_fe_txn_exec_latency_ms`| | 毫秒| 事务执行耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务执行耗时 | 详细观察各分位事务执行耗时 | P0 |
+|`doris_fe_txn_publish_latency_ms`| | 毫秒| 事务publish耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务publish耗时 | 详细观察各分位事务publish耗时 | P0 |
+|`doris_fe_txn_num`|| Num| 指定DB正在执行的事务数。如 {db="test"} 表示DB test 当前正在执行的事务数 |该数值可以观测某个DB是否提交了大量事务| P0 |
+|`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test 当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 |
+|`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如 {method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| |
+|`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如 {method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| |
### JVM 监控
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index 96e4b114d9..cbd3312b65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -17,7 +17,7 @@
package org.apache.doris.common;
-import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
@@ -37,6 +37,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
/**
* ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource.
@@ -75,29 +77,35 @@ public class ThreadPoolManager {
}
public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) {
- for (String poolMetricType : poolMetricTypes) {
- GaugeMetric<Integer> gauge = new GaugeMetric<Integer>(
- "thread_pool", MetricUnit.NOUNIT, "thread_pool statistics") {
- @Override
- public Integer getValue() {
- String metricType = this.getLabels().get(1).getValue();
- switch (metricType) {
- case "pool_size":
- return threadPool.getPoolSize();
- case "active_thread_num":
- return threadPool.getActiveCount();
- case "task_in_queue":
- return threadPool.getQueue().size();
- default:
- return 0;
- }
- }
- };
- gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", poolMetricType));
- MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
+ Metric.MetricType gauge = Metric.MetricType.GAUGE;
+ Metric.MetricType counter = Metric.MetricType.COUNTER;
+ MetricUnit nounit = MetricUnit.NOUNIT;
+ registerMetric(poolName, "pool_size", gauge, nounit, threadPool::getPoolSize);
+ registerMetric(poolName, "active_thread_num", gauge, nounit, threadPool::getActiveCount);
+ registerMetric(poolName, "active_thread_pct", gauge, MetricUnit.PERCENT,
+ () -> 1.0 * threadPool.getActiveCount() / threadPool.getMaximumPoolSize());
+ registerMetric(poolName, "task_in_queue", gauge, nounit, () -> threadPool.getQueue().size());
+ registerMetric(poolName, "task_count", counter, nounit, threadPool::getTaskCount);
+ registerMetric(poolName, "completed_task_count", counter, nounit, threadPool::getCompletedTaskCount);
+ RejectedExecutionHandler rejectedHandler = threadPool.getRejectedExecutionHandler();
+ if (rejectedHandler instanceof LogDiscardPolicy) {
+ registerMetric(poolName, "task_rejected", counter, nounit,
+ ((LogDiscardPolicy) rejectedHandler).rejectedNum::get);
}
}
+ private static <T> void registerMetric(String poolName, String metricName,
+ Metric.MetricType type, MetricUnit unit, Supplier<T> supplier) {
+ Metric<T> gauge = new Metric<T>("thread_pool", type, unit, "thread_pool statistics") {
+ @Override
+ public T getValue() {
+ return supplier.get();
+ }
+ };
+ gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", metricName));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
+ }
+
public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread,
String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
@@ -165,14 +173,17 @@ public class ThreadPoolManager {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
+ private AtomicLong rejectedNum;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
+ this.rejectedNum = new AtomicLong(0);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
+ this.rejectedNum.incrementAndGet();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
new file mode 100644
index 0000000000..17b7e1a104
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/AutoMappedMetric.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.metric;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+public class AutoMappedMetric<M> {
+
+ private final Map<String, M> nameToMetric = new ConcurrentHashMap<>();
+ private final Function<String, M> metricSupplier;
+
+ public AutoMappedMetric(Function<String, M> metricSupplier) {
+ this.metricSupplier = metricSupplier;
+ }
+
+ public M getOrAdd(String name) {
+ return nameToMetric.computeIfAbsent(name, metricSupplier);
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index c12fb11d75..9991c651d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -51,7 +51,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
public final class MetricRepo {
private static final Logger LOG = LogManager.getLogger(MetricRepo.class);
@@ -71,6 +71,13 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_QUERY_ERR;
public static LongCounterMetric COUNTER_QUERY_TABLE;
public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE;
+ public static Histogram HISTO_QUERY_LATENCY;
+ public static AutoMappedMetric<Histogram> DB_HISTO_QUERY_LATENCY;
+ public static AutoMappedMetric<GaugeMetricImpl<Long>> USER_GAUGE_QUERY_INSTANCE_NUM;
+ public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_INSTANCE_BEGIN;
+ public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_ALL;
+ public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_FAILED;
+ public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_SIZE;
public static LongCounterMetric COUNTER_CACHE_ADDED_SQL;
public static LongCounterMetric COUNTER_CACHE_ADDED_PARTITION;
@@ -80,27 +87,33 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
+ public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
+ public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
+ public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
+
public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED;
public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED;
- public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
- public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
public static LongCounterMetric COUNTER_TXN_REJECT;
public static LongCounterMetric COUNTER_TXN_BEGIN;
public static LongCounterMetric COUNTER_TXN_FAILED;
public static LongCounterMetric COUNTER_TXN_SUCCESS;
+ public static Histogram HISTO_TXN_EXEC_LATENCY;
+ public static Histogram HISTO_TXN_PUBLISH_LATENCY;
+ public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM;
+ public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_REPLICA_NUM;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
- public static Histogram HISTO_QUERY_LATENCY;
- public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
+ public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
+ public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_LATENCY;
// following metrics will be updated by metric calculator
public static GaugeMetricImpl<Double> GAUGE_QUERY_PER_SECOND;
@@ -118,7 +131,6 @@ public final class MetricRepo {
return;
}
- // 1. gauge
// load jobs
LoadManager loadManger = Env.getCurrentEnv().getLoadManager();
for (EtlJobType jobType : EtlJobType.values()) {
@@ -228,21 +240,6 @@ public final class MetricRepo {
};
DORIS_METRIC_REGISTER.addMetrics(scheduledTabletNum);
- GaugeMetric<Long> maxInstanceNum = new GaugeMetric<Long>("max_instances_num_per_user",
- MetricUnit.NOUNIT, "max instances num of all current users") {
- @Override
- public Long getValue() {
- try {
- return ((QeProcessorImpl) QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream()
- .reduce(-1, BinaryOperator.maxBy(Integer::compareTo)).longValue();
- } catch (Throwable ex) {
- LOG.warn("Get max_instances_num_per_user error", ex);
- return -2L;
- }
- }
- };
- DORIS_METRIC_REGISTER.addMetrics(maxInstanceNum);
-
// txn status
for (TransactionStatus status : TransactionStatus.values()) {
GaugeMetric<Long> gauge = new GaugeMetric<Long>("txn_status", MetricUnit.NOUNIT, "txn statistics") {
@@ -274,19 +271,53 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
- // 2. counter
+ // query
COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", MetricUnit.REQUESTS, "total request");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_REQUEST_ALL);
COUNTER_QUERY_ALL = new LongCounterMetric("query_total", MetricUnit.REQUESTS, "total query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ALL);
COUNTER_QUERY_ERR = new LongCounterMetric("query_err", MetricUnit.REQUESTS, "total error query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ERR);
-
COUNTER_QUERY_TABLE = new LongCounterMetric("query_table", MetricUnit.REQUESTS, "total query from table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_TABLE);
COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS,
"total query from olap table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE);
+ HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
+ MetricRegistry.name("query", "latency", "ms"));
+ DB_HISTO_QUERY_LATENCY = new AutoMappedMetric<>(name -> {
+ String metricName = MetricRegistry.name("query", "latency", "ms", "db=" + name);
+ return METRIC_REGISTER.histogram(metricName);
+ });
+ USER_COUNTER_QUERY_INSTANCE_BEGIN = addLabeledMetrics("user", () ->
+ new LongCounterMetric("query_instance_begin", MetricUnit.NOUNIT,
+ "number of query instance begin"));
+ USER_GAUGE_QUERY_INSTANCE_NUM = addLabeledMetrics("user", () ->
+ new GaugeMetricImpl<>("query_instance_num", MetricUnit.NOUNIT,
+ "number of running query instances of current user"));
+ GaugeMetric<Long> queryInstanceNum = new GaugeMetric<Long>("query_instance_num",
+ MetricUnit.NOUNIT, "number of query instances of all current users") {
+ @Override
+ public Long getValue() {
+ QeProcessorImpl qe = ((QeProcessorImpl) QeProcessorImpl.INSTANCE);
+ long totalInstanceNum = 0;
+ for (Map.Entry<String, Integer> e : qe.getInstancesNumPerUser().entrySet()) {
+ long value = e.getValue() == null ? 0L : e.getValue().longValue();
+ totalInstanceNum += value;
+ USER_GAUGE_QUERY_INSTANCE_NUM.getOrAdd(e.getKey()).setValue(value);
+ }
+ return totalInstanceNum;
+ }
+ };
+ DORIS_METRIC_REGISTER.addMetrics(queryInstanceNum);
+ BE_COUNTER_QUERY_RPC_ALL = addLabeledMetrics("be", () ->
+ new LongCounterMetric("query_rpc_total", MetricUnit.NOUNIT, ""));
+ BE_COUNTER_QUERY_RPC_FAILED = addLabeledMetrics("be", () ->
+ new LongCounterMetric("query_rpc_failed", MetricUnit.NOUNIT, ""));
+ BE_COUNTER_QUERY_RPC_SIZE = addLabeledMetrics("be", () ->
+ new LongCounterMetric("query_rpc_size", MetricUnit.BYTES, ""));
+
+ // cache
COUNTER_CACHE_ADDED_SQL = new LongCounterMetric("cache_added", MetricUnit.REQUESTS,
"Number of SQL mode cache added");
COUNTER_CACHE_ADDED_SQL.addLabel(new MetricLabel("type", "sql"));
@@ -304,6 +335,7 @@ public final class MetricRepo {
COUNTER_CACHE_HIT_PARTITION.addLabel(new MetricLabel("type", "partition"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_HIT_PARTITION);
+ // edit log
COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
"counter of edit log write into bdbje");
COUNTER_EDIT_LOG_WRITE.addLabel(new MetricLabel("type", "write"));
@@ -315,6 +347,18 @@ public final class MetricRepo {
COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log", MetricUnit.BYTES, "size of edit log");
COUNTER_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "bytes"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
+ HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
+ MetricRegistry.name("editlog", "write", "latency", "ms"));
+
+ // edit log clean
+ COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
+ "counter of edit log succeed in cleaning");
+ COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
+ COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
+ "counter of edit log failed to clean");
+ COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
// image generate
COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write", MetricUnit.OPERATIONS,
@@ -345,16 +389,7 @@ public final class MetricRepo {
COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_CLEAN_FAILED);
- // edit log clean
- COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
- "counter of edit log succeed in cleaning");
- COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
- DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
- COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
- "counter of edit log failed to clean");
- COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
- DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
-
+ // txn
COUNTER_TXN_REJECT = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
"counter of rejected transactions");
COUNTER_TXN_REJECT.addLabel(new MetricLabel("type", "reject"));
@@ -371,6 +406,30 @@ public final class MetricRepo {
"counter of failed transactions");
COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
+ HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
+ MetricRegistry.name("txn", "exec", "latency", "ms"));
+ HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
+ MetricRegistry.name("txn", "publish", "latency", "ms"));
+ GaugeMetric<Long> txnNum = new GaugeMetric<Long>("txn_num", MetricUnit.NOUNIT,
+ "number of running transactions") {
+ @Override
+ public Long getValue() {
+ return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnNum();
+ }
+ };
+ DORIS_METRIC_REGISTER.addMetrics(txnNum);
+ DB_GAUGE_TXN_NUM = addLabeledMetrics("db", () ->
+ new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions"));
+ GaugeMetric<Long> txnReplicaNum = new GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT,
+ "number of writing tablets in all running transactions") {
+ @Override
+ public Long getValue() {
+ return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnReplicaNum();
+ }
+ };
+ DORIS_METRIC_REGISTER.addMetrics(txnReplicaNum);
+ DB_GAUGE_TXN_REPLICA_NUM = addLabeledMetrics("db", () -> new GaugeMetricImpl<>("txn_replica_num",
+ MetricUnit.NOUNIT, "number of writing tablets in all running transactions"));
COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS,
"total rows of routine load");
@@ -385,11 +444,11 @@ public final class MetricRepo {
COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE);
- // 3. histogram
- HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
- MetricRegistry.name("query", "latency", "ms"));
- HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
- MetricRegistry.name("editlog", "write", "latency", "ms"));
+
+ THRIFT_COUNTER_RPC_ALL = addLabeledMetrics("method", () ->
+ new LongCounterMetric("thrift_rpc_total", MetricUnit.NOUNIT, ""));
+ THRIFT_COUNTER_RPC_LATENCY = addLabeledMetrics("method", () ->
+ new LongCounterMetric("thrift_rpc_latency_ms", MetricUnit.MILLISECONDS, ""));
// init system metrics
initSystemMetrics();
@@ -587,6 +646,15 @@ public final class MetricRepo {
return sb.toString();
}
+ public static <M extends Metric<?>> AutoMappedMetric<M> addLabeledMetrics(String label, Supplier<M> metric) {
+ return new AutoMappedMetric<>(value -> {
+ M m = metric.get();
+ m.addLabel(new MetricLabel(label, value));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(m);
+ return m;
+ });
+ }
+
// update some metrics to make a ready to be visited
private static void updateMetrics() {
SYSTEM_METRICS.update();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index ef470cf711..5a2eddb17a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -62,6 +62,7 @@ import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
@@ -346,6 +347,7 @@ public class OlapTableSink extends DataSink {
TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
// BE id -> path hash
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
+ int replicaNum = 0;
for (Long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
@@ -375,6 +377,7 @@ public class OlapTableSink extends DataSink {
Lists.newArrayList(bePathsMap.keySet())));
}
allBePathsMap.putAll(bePathsMap);
+ replicaNum += bePathsMap.size();
}
}
}
@@ -385,6 +388,14 @@ public class OlapTableSink extends DataSink {
if (!st.ok()) {
throw new DdlException(st.getErrorMsg());
}
+ long dbId = tDataSink.getOlapTableSink().getDbId();
+ long txnId = tDataSink.getOlapTableSink().getTxnId();
+ try {
+ DatabaseTransactionMgr mgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
+ mgr.registerTxnReplicas(txnId, replicaNum);
+ } catch (Exception e) {
+ LOG.error("register txn replica failed, txnId={}, dbId={}", txnId, dbId);
+ }
return Arrays.asList(locationParam, slaveLocationParam);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index f424adf7b1..847bb5baf0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -192,6 +192,7 @@ public class ConnectProcessor {
} else if (ctx.getState().getStateType() == MysqlStateType.OK) {
// ok query
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
+ MetricRepo.DB_HISTO_QUERY_LATENCY.getOrAdd(ctx.getDatabase()).update(elapseMs);
if (elapseMs > Config.qe_slow_log_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 13809d5633..7ac58111aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
@@ -724,8 +725,10 @@ public class Coordinator {
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
+ MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
case THRIFT_RPC_ERROR:
+ MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L);
SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
default:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index fbf86d4347..1bf11ece97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@@ -113,6 +114,7 @@ public final class QeProcessorImpl implements QeProcessor {
}
queryToInstancesNum.put(queryId, instancesNum);
userToInstancesCount.computeIfAbsent(user, ignored -> new AtomicInteger(0)).addAndGet(instancesNum);
+ MetricRepo.USER_COUNTER_QUERY_INSTANCE_BEGIN.getOrAdd(user).increase(instancesNum.longValue());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 7316782251..c58defa488 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -18,6 +18,7 @@
package org.apache.doris.rpc;
import org.apache.doris.common.Config;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
import org.apache.doris.proto.Types;
@@ -122,6 +123,8 @@ public class BackendServiceProxy {
builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2);
final InternalService.PExecPlanFragmentRequest pRequest = builder.build();
+ MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L);
+ MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) pRequest.getSerializedSize());
try {
final BackendServiceClient client = getProxy(address);
if (twoPhaseExecution) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
index 0c64799a3c..dd7b203833 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
@@ -18,6 +18,7 @@
package org.apache.doris.service;
import org.apache.doris.common.ThriftServer;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.FrontendService;
import org.apache.logging.log4j.LogManager;
@@ -25,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TProcessor;
import java.io.IOException;
+import java.lang.reflect.Proxy;
/**
* Doris frontend thrift server
@@ -40,9 +42,21 @@ public class FeServer {
}
public void start() throws IOException {
+ FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
+ FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
+ FrontendServiceImpl.class.getClassLoader(),
+ FrontendServiceImpl.class.getInterfaces(),
+ (proxy, method, args) -> {
+ long begin = System.currentTimeMillis();
+ String name = method.getName();
+ MetricRepo.THRIFT_COUNTER_RPC_ALL.getOrAdd(name).increase(1L);
+ Object r = method.invoke(service, args);
+ long end = System.currentTimeMillis();
+ MetricRepo.THRIFT_COUNTER_RPC_LATENCY.getOrAdd(name).increase(end - begin);
+ return r;
+ });
// setup frontend server
- TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>(
- new FrontendServiceImpl(ExecuteEnv.getInstance()));
+ TProcessor tprocessor = new FrontendService.Processor<>(instance);
server = new ThriftServer(port, tprocessor);
server.start();
LOG.info("thrift server started.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 843df334ee..85f0a32892 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -128,6 +128,7 @@ public class DatabaseTransactionMgr {
// count the number of running txns of database, except for the routine load txn
private volatile int runningTxnNums = 0;
+ private volatile int runningTxnReplicaNums = 0;
// count only the number of running routine load txns of database
private volatile int runningRoutineLoadTxnNums = 0;
@@ -984,7 +985,11 @@ public class DatabaseTransactionMgr {
return;
}
// update transaction state version
- transactionState.setCommitTime(System.currentTimeMillis());
+ long commitTime = System.currentTimeMillis();
+ transactionState.setCommitTime(commitTime);
+ if (MetricRepo.isInit) {
+ MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime());
+ }
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
@@ -1095,6 +1100,38 @@ public class DatabaseTransactionMgr {
updateTxnLabels(transactionState);
}
+ public void registerTxnReplicas(long txnId, int replicaNum) throws UserException {
+ writeLock();
+ try {
+ TransactionState transactionState = idToRunningTransactionState.get(txnId);
+ if (transactionState == null) {
+ throw new UserException("running transaction not found, txnId=" + txnId);
+ }
+ transactionState.setReplicaNum(replicaNum);
+ runningTxnReplicaNums += replicaNum;
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public int getRunningTxnNum() {
+ readLock();
+ try {
+ return runningTxnNums;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ public int getRunningTxnReplicaNum() {
+ readLock();
+ try {
+ return runningTxnReplicaNums;
+ } finally {
+ readUnlock();
+ }
+ }
+
private void updateTxnLabels(TransactionState transactionState) {
Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
if (txnIds == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 4eae5c817a..dccdd5903c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TStatus;
@@ -640,4 +641,31 @@ public class GlobalTransactionMgr implements Writable {
}
throw new TimeoutException("Operation is timeout");
}
+
+ public long getAllRunningTxnNum() {
+ long total = 0;
+ for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
+ long num = mgr.getRunningTxnNum();
+ total += num;
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
+ if (db != null) {
+ MetricRepo.DB_GAUGE_TXN_NUM.getOrAdd(db.getFullName()).setValue(num);
+ }
+ }
+ return total;
+ }
+
+ public long getAllRunningTxnReplicaNum() {
+ long total = 0;
+ for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
+ long num = mgr.getRunningTxnReplicaNum();
+ total += num;
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
+ if (db != null) {
+ MetricRepo.DB_GAUGE_TXN_REPLICA_NUM.getOrAdd(db.getFullName()).setValue(num);
+ }
+ }
+ return total;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index e3ec3019ea..c1d894cf56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
@@ -255,6 +256,10 @@ public class PublishVersionDaemon extends MasterDaemon {
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
+ if (MetricRepo.isInit) {
+ long publishTime = transactionState.getPublishVersionTime() - transactionState.getCommitTime();
+ MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
+ }
}
} // end for readyTransactionStates
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 3a59fac9d9..d15f070e6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -171,6 +171,7 @@ public class TransactionState implements Writable {
private long dbId;
private List<Long> tableIdList;
+ private int replicaNum = 0;
private long transactionId;
private String label;
// requestId is used to judge whether a begin request is a internal retry request.
@@ -493,6 +494,14 @@ public class TransactionState implements Writable {
return tableIdList;
}
+ public int getReplicaNum() {
+ return replicaNum;
+ }
+
+ public void setReplicaNum(int replicaNum) {
+ this.replicaNum = replicaNum;
+ }
+
public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
return idToTableCommitInfos;
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
index 15c62fb191..2ed1ddd67e 100755
--- a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
@@ -17,13 +17,9 @@
package org.apache.doris.common;
-import org.apache.doris.metric.Metric;
-import org.apache.doris.metric.MetricRepo;
-
import org.junit.Assert;
import org.junit.Test;
-import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolManagerTest {
@@ -37,9 +33,6 @@ public class ThreadPoolManagerTest {
ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool);
ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool);
- List<Metric> metricList = MetricRepo.getMetricsByName("thread_pool");
-
- Assert.assertEquals(6, metricList.size());
Assert.assertEquals(ThreadPoolManager.LogDiscardPolicy.class,
testCachedPool.getRejectedExecutionHandler().getClass());
Assert.assertEquals(ThreadPoolManager.BlockedPolicy.class,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org