You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/19 07:15:26 UTC

[doris] 02/08: [enhancement](metric)add one metric for the publish num per db (#14942)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 333c68a4fcb35aaec32b047367f11670fb017b59
Author: AlexYue <yj...@gmail.com>
AuthorDate: Mon Dec 19 14:18:11 2022 +0800

    [enhancement](metric)add one metric for the publish num per db (#14942)
    
    Add one metric to detect the publish txn num per db. User can get the relative speed of the txns processing per db using this metric and doris_fe_txn_num.
---
 .../maint-monitor/monitor-metrics/metrics.md       |  1 +
 .../java/org/apache/doris/metric/MetricRepo.java   | 14 ++++++-
 .../doris/transaction/GlobalTransactionMgr.java    | 43 +++++++++++++---------
 .../doris/transaction/PublishVersionDaemon.java    |  3 +-
 4 files changed, 41 insertions(+), 20 deletions(-)

diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 96ede4e7d5..a55cff98f7 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -140,6 +140,7 @@ curl http://be_host:webserver_port/metrics?type=json
 |`doris_fe_txn_exec_latency_ms`| | 毫秒| 事务执行耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务执行耗时 | 详细观察各分位事务执行耗时 | P0 |
 |`doris_fe_txn_publish_latency_ms`| | 毫秒| 事务publish耗时的百分位统计。如 {quantile="0.75"} 表示 75 分位的事务publish耗时 | 详细观察各分位事务publish耗时 | P0 |
 |`doris_fe_txn_num`|| Num| 指定DB正在执行的事务数。如 {db="test"} 表示DB test 当前正在执行的事务数 |该数值可以观测某个DB是否提交了大量事务| P0 |
+|`doris_fe_publish_txn_num`|| Num| 指定DB正在publish的事务数。如 {db="test"} 表示DB test 当前正在publish的事务数 |该数值可以观测某个DB的publish事务数量| P0 |
 |`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test 当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 |
 |`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如 {method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| |
 |`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如 {method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| |
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 926174092b..85c4479fbc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -105,6 +105,7 @@ public final class MetricRepo {
     public static Histogram HISTO_TXN_EXEC_LATENCY;
     public static Histogram HISTO_TXN_PUBLISH_LATENCY;
     public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM;
+    public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_PUBLISH_TXN_NUM;
     public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_REPLICA_NUM;
 
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
@@ -419,7 +420,18 @@ public final class MetricRepo {
         };
         DORIS_METRIC_REGISTER.addMetrics(txnNum);
         DB_GAUGE_TXN_NUM = addLabeledMetrics("db", () ->
-            new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions"));
+                new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions"));
+        GaugeMetric<Long> publishTxnNum = new GaugeMetric<Long>("publish_txn_num", MetricUnit.NOUNIT,
+                "number of publish transactions") {
+            @Override
+            public Long getValue() {
+                return Env.getCurrentGlobalTransactionMgr().getAllPublishTxnNum();
+            }
+        };
+        DORIS_METRIC_REGISTER.addMetrics(publishTxnNum);
+        DB_GAUGE_PUBLISH_TXN_NUM = addLabeledMetrics("db",
+                () -> new GaugeMetricImpl<>("publish_txn_num", MetricUnit.NOUNIT, "number of publish transactions"));
+
         GaugeMetric<Long> txnReplicaNum = new GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT,
                 "number of writing tablets in all running transactions") {
             @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index dccdd5903c..691b0eb249 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.metric.AutoMappedMetric;
+import org.apache.doris.metric.GaugeMetricImpl;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
 import org.apache.doris.persist.EditLog;
@@ -56,6 +58,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 /**
  * Transaction Manager
@@ -113,7 +116,7 @@ public class GlobalTransactionMgr implements Writable {
 
     /**
      * the app could specify the transaction id
-     *
+     * <p>
      * requestId is used to judge that whether the request is a internal retry request
      * if label already exist, and requestId are equal, we return the exist tid, and consider this 'begin'
      * as success.
@@ -178,8 +181,8 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public void preCommitTransaction2PC(Database db, List<Table> tableList, long transactionId,
-                                               List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
-                                               TxnCommitAttachment txnCommitAttachment)
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
+            TxnCommitAttachment txnCommitAttachment)
             throws UserException {
         if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
             throw new UserException("get tableList write lock timeout, tableList=("
@@ -242,14 +245,14 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId,
-                                               List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
             throws UserException {
         return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
     }
 
     public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId,
-                                               List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
-                                               TxnCommitAttachment txnCommitAttachment)
+            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
+            TxnCommitAttachment txnCommitAttachment)
             throws UserException {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
@@ -643,26 +646,30 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public long getAllRunningTxnNum() {
-        long total = 0;
-        for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
-            long num = mgr.getRunningTxnNum();
-            total += num;
-            Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
-            if (db != null) {
-                MetricRepo.DB_GAUGE_TXN_NUM.getOrAdd(db.getFullName()).setValue(num);
-            }
-        }
-        return total;
+        return updateTxnMetric(databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getRunningTxnNum()),
+                MetricRepo.DB_GAUGE_TXN_NUM);
     }
 
     public long getAllRunningTxnReplicaNum() {
+        return updateTxnMetric(databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getRunningTxnReplicaNum()),
+                MetricRepo.DB_GAUGE_TXN_REPLICA_NUM);
+    }
+
+    public long getAllPublishTxnNum() {
+        return updateTxnMetric(
+                databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getCommittedTxnList().size()),
+                MetricRepo.DB_GAUGE_PUBLISH_TXN_NUM);
+    }
+
+    private long updateTxnMetric(Function<DatabaseTransactionMgr, Long> metricSupplier,
+            AutoMappedMetric<GaugeMetricImpl<Long>> metric) {
         long total = 0;
         for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
-            long num = mgr.getRunningTxnReplicaNum();
+            long num = metricSupplier.apply(mgr).longValue();
             total += num;
             Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
             if (db != null) {
-                MetricRepo.DB_GAUGE_TXN_REPLICA_NUM.getOrAdd(db.getFullName()).setValue(num);
+                metric.getOrAdd(db.getFullName()).setValue(num);
             }
         }
         return total;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index c1d894cf56..45d5ee5847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -131,7 +131,8 @@ public class PublishVersionDaemon extends MasterDaemon {
                 transactionState.addPublishVersionTask(backendId, task);
             }
             transactionState.setHasSendTask(true);
-            LOG.info("send publish tasks for transaction: {}", transactionState.getTransactionId());
+            LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
+                    transactionState.getDbId());
         }
         if (!batchTask.getAllTasks().isEmpty()) {
             AgentTaskExecutor.submit(batchTask);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org