You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/02 12:42:48 UTC

[rocketmq] branch develop updated: implement broker stats metrics and request metrics (#5449)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 350e433ac implement broker stats metrics and request metrics (#5449)
350e433ac is described below

commit 350e433acc19a7ad4e69960063b4c694de9e82c5
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Nov 2 20:42:42 2022 +0800

    implement broker stats metrics and request metrics (#5449)
    
    * implement broker stats metrics and request metrics
    
    * fix bazel
    
    * PopBigMessageIT potentially needs more time
    
    * Fix bazel deps warning
    
    Co-authored-by: Li Zhanhui <li...@gmail.com>
---
 broker/BUILD.bazel                                 |   1 +
 .../apache/rocketmq/broker/BrokerController.java   |  20 ++++
 .../broker/metrics/BrokerMetricsConstant.java      |  15 +++
 .../broker/metrics/BrokerMetricsManager.java       | 117 ++++++++++++++++++++-
 ...kerMetricsConstant.java => NopLongCounter.java} |  24 +++--
 ...rMetricsConstant.java => NopLongHistogram.java} |  24 +++--
 ...ricsConstant.java => NopLongUpDownCounter.java} |  24 +++--
 ...csConstant.java => NopObservableLongGauge.java} |  11 +-
 .../processor/DefaultPullMessageResultHandler.java |  22 +++-
 .../broker/processor/PeekMessageProcessor.java     |  19 ++++
 .../broker/processor/PopMessageProcessor.java      |  18 ++++
 .../broker/processor/PopReviveService.java         |  16 +++
 .../broker/processor/ReplyMessageProcessor.java    |  23 +++-
 .../broker/processor/SendMessageProcessor.java     |  32 ++++--
 .../broker/schedule/ScheduleMessageService.java    |  26 +++++
 .../queue/TransactionalMessageBridge.java          |  17 +++
 .../common/attribute/TopicMessageType.java         |   4 +
 .../test/client/consumer/pop/PopBigMessageIT.java  |   2 +-
 18 files changed, 365 insertions(+), 50 deletions(-)

diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index a537520f8..ea8dd9046 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -41,6 +41,7 @@ java_library(
         "@maven//:io_netty_netty_all",
         "@maven//:io_openmessaging_storage_dledger",
         "@maven//:io_opentelemetry_opentelemetry_api",
+        "@maven//:io_opentelemetry_opentelemetry_context",
         "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
         "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
         "@maven//:io_opentelemetry_opentelemetry_sdk",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d5041c2a7..e35fc7dc1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2283,4 +2283,24 @@ public class BrokerController {
         return this.topicRouteInfoManager;
     }
 
+    public BlockingQueue<Runnable> getClientManagerThreadPoolQueue() {
+        return clientManagerThreadPoolQueue;
+    }
+
+    public BlockingQueue<Runnable> getConsumerManagerThreadPoolQueue() {
+        return consumerManagerThreadPoolQueue;
+    }
+
+    public BlockingQueue<Runnable> getAsyncPutThreadPoolQueue() {
+        return putThreadPoolQueue;
+    }
+
+    public BlockingQueue<Runnable> getReplyThreadPoolQueue() {
+        return replyThreadPoolQueue;
+    }
+
+    public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
+        return adminBrokerThreadPoolQueue;
+    }
+
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 48c73b1b8..d13516551 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -18,12 +18,27 @@ package org.apache.rocketmq.broker.metrics;
 
 public class BrokerMetricsConstant {
     public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
+
+    public static final String GAUGE_PROCESSOR_WATERMARK = "rocketmq_processor_watermark";
     public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
 
+    public static final String COUNTER_MESSAGES_IN_TOTAL = "rocketmq_messages_in_total";
+    public static final String COUNTER_MESSAGES_OUT_TOTAL = "rocketmq_messages_out_total";
+    public static final String COUNTER_THROUGHPUT_IN_TOTAL = "rocketmq_throughput_in_total";
+    public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "rocketmq_throughput_out_total";
+    public static final String HISTOGRAM_MESSAGE_SIZE = "rocketmq_message_size";
+
     public static final String LABEL_CLUSTER_NAME = "cluster";
     public static final String LABEL_NODE_TYPE = "node_type";
     public static final String NODE_TYPE_BROKER = "broker";
     public static final String LABEL_NODE_ID = "node_id";
     public static final String LABEL_AGGREGATION = "aggregation";
     public static final String AGGREGATION_DELTA = "delta";
+    public static final String LABEL_PROCESSOR = "processor";
+
+    public static final String LABEL_TOPIC = "topic";
+    public static final String LABEL_IS_RETRY = "is_retry";
+    public static final String LABEL_IS_SYSTEM = "is_system";
+    public static final String LABEL_CONSUMER_GROUP = "consumer_group";
+    public static final String LABEL_MESSAGE_TYPE = "message_type";
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index e6fab145f..d18a1eb25 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -19,18 +19,24 @@ package org.apache.rocketmq.broker.metrics;
 import com.google.common.base.Splitter;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongGauge;
 import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
 import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
 import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
 import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
 import io.opentelemetry.sdk.metrics.InstrumentType;
 import io.opentelemetry.sdk.metrics.SdkMeterProvider;
 import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
 import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
 import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
 import io.opentelemetry.sdk.resources.Resource;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,12 +44,24 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.MessageStore;
 
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
@@ -64,7 +82,16 @@ public class BrokerMetricsManager {
     private PrometheusHttpServer prometheusHttpServer;
     private Meter brokerMeter;
 
-    public static ObservableLongGauge brokerPermission = null;
+    // broker stats metrics
+    public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
+    public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
+
+    // request metrics
+    public static LongCounter messagesInTotal = new NopLongCounter();
+    public static LongCounter messagesOutTotal = new NopLongCounter();
+    public static LongCounter throughputInTotal = new NopLongCounter();
+    public static LongCounter throughputOutTotal = new NopLongCounter();
+    public static LongHistogram messageSize = new NopLongHistogram();
 
     public BrokerMetricsManager(BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -79,6 +106,30 @@ public class BrokerMetricsManager {
         return attributesBuilder;
     }
 
+    public static boolean isRetryOrDlqTopic(String topic) {
+        if (StringUtils.isBlank(topic)) {
+            return false;
+        }
+        return topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
+    }
+
+    public static TopicMessageType getMessageType(SendMessageRequestHeader requestHeader) {
+        Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
+        String traFlag = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+        TopicMessageType topicMessageType = TopicMessageType.NORMAL;
+        if (Boolean.parseBoolean(traFlag)) {
+            topicMessageType = TopicMessageType.TRANSACTION;
+        } else if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
+            topicMessageType = TopicMessageType.FIFO;
+        } else if (properties.get("__STARTDELIVERTIME") != null
+            || properties.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
+            || properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
+            || properties.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
+            topicMessageType = TopicMessageType.DELAY;
+        }
+        return topicMessageType;
+    }
+
     public Meter getBrokerMeter() {
         return brokerMeter;
     }
@@ -181,21 +232,85 @@ public class BrokerMetricsManager {
             providerBuilder.registerMetricReader(prometheusHttpServer);
         }
 
+        registerMetricsView(providerBuilder);
+
         brokerMeter = OpenTelemetrySdk.builder()
             .setMeterProvider(providerBuilder.build())
             .build()
             .getMeter(OPEN_TELEMETRY_METER_NAME);
 
         initStatsMetrics();
+        initRequestMetrics();
+    }
+
+    private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+        // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M
+        List<Double> messageSizeBuckets = Arrays.asList(
+            1d * 1024, //1KB
+            4d * 1024, //4KB
+            512d * 1024, //512KB
+            1d * 1024 * 1024, //1MB
+            2d * 1024 * 1024, //2MB
+            4d * 1024 * 1024 //4MB
+        );
+        InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_MESSAGE_SIZE)
+            .build();
+        View messageSizeView = View.builder()
+            .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
+            .build();
+        providerBuilder.registerView(messageSizeSelector, messageSizeView);
     }
 
     private void initStatsMetrics() {
+        processorWatermark = brokerMeter.gaugeBuilder(GAUGE_PROCESSOR_WATERMARK)
+            .setDescription("Request processor watermark")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "send").build());
+                measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "async_put").build());
+                measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "pull").build());
+                measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "ack").build());
+                measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "query_message").build());
+                measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "client_manager").build());
+                measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "heartbeat").build());
+                measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "lite_pull").build());
+                measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "transaction").build());
+                measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "consumer_manager").build());
+                measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "admin").build());
+                measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "reply").build());
+            });
+
         brokerPermission = brokerMeter.gaugeBuilder(GAUGE_BROKER_PERMISSION)
             .setDescription("Broker permission")
             .ofLongs()
             .buildWithCallback(measurement -> measurement.record(brokerConfig.getBrokerPermission(), newAttributesBuilder().build()));
     }
 
+    private void initRequestMetrics() {
+        messagesInTotal = brokerMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL)
+            .setDescription("Total number of incoming messages")
+            .build();
+
+        messagesOutTotal = brokerMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL)
+            .setDescription("Total number of outgoing messages")
+            .build();
+
+        throughputInTotal = brokerMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL)
+            .setDescription("Total traffic of incoming messages")
+            .build();
+
+        throughputOutTotal = brokerMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL)
+            .setDescription("Total traffic of outgoing messages")
+            .build();
+
+        messageSize = brokerMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE)
+            .setDescription("Incoming messages size")
+            .ofLongs()
+            .build();
+    }
+
     public void shutdown() {
         if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
             periodicMetricReader.forceFlush();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
index 48c73b1b8..5f9c558e0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
@@ -16,14 +16,20 @@
  */
 package org.apache.rocketmq.broker.metrics;
 
-public class BrokerMetricsConstant {
-    public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
-    public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.context.Context;
 
-    public static final String LABEL_CLUSTER_NAME = "cluster";
-    public static final String LABEL_NODE_TYPE = "node_type";
-    public static final String NODE_TYPE_BROKER = "broker";
-    public static final String LABEL_NODE_ID = "node_id";
-    public static final String LABEL_AGGREGATION = "aggregation";
-    public static final String AGGREGATION_DELTA = "delta";
+public class NopLongCounter implements LongCounter {
+    @Override public void add(long l) {
+
+    }
+
+    @Override public void add(long l, Attributes attributes) {
+
+    }
+
+    @Override public void add(long l, Attributes attributes, Context context) {
+
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
index 48c73b1b8..582b20daa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
@@ -16,14 +16,20 @@
  */
 package org.apache.rocketmq.broker.metrics;
 
-public class BrokerMetricsConstant {
-    public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
-    public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.context.Context;
 
-    public static final String LABEL_CLUSTER_NAME = "cluster";
-    public static final String LABEL_NODE_TYPE = "node_type";
-    public static final String NODE_TYPE_BROKER = "broker";
-    public static final String LABEL_NODE_ID = "node_id";
-    public static final String LABEL_AGGREGATION = "aggregation";
-    public static final String AGGREGATION_DELTA = "delta";
+public class NopLongHistogram implements LongHistogram {
+    @Override public void record(long l) {
+
+    }
+
+    @Override public void record(long l, Attributes attributes) {
+
+    }
+
+    @Override public void record(long l, Attributes attributes, Context context) {
+
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
index 48c73b1b8..0752d57a1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
@@ -16,14 +16,20 @@
  */
 package org.apache.rocketmq.broker.metrics;
 
-public class BrokerMetricsConstant {
-    public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
-    public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.context.Context;
 
-    public static final String LABEL_CLUSTER_NAME = "cluster";
-    public static final String LABEL_NODE_TYPE = "node_type";
-    public static final String NODE_TYPE_BROKER = "broker";
-    public static final String LABEL_NODE_ID = "node_id";
-    public static final String LABEL_AGGREGATION = "aggregation";
-    public static final String AGGREGATION_DELTA = "delta";
+public class NopLongUpDownCounter implements LongUpDownCounter {
+    @Override public void add(long l) {
+
+    }
+
+    @Override public void add(long l, Attributes attributes) {
+
+    }
+
+    @Override public void add(long l, Attributes attributes, Context context) {
+
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
index 48c73b1b8..442f3697e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
@@ -16,14 +16,7 @@
  */
 package org.apache.rocketmq.broker.metrics;
 
-public class BrokerMetricsConstant {
-    public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
-    public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.metrics.ObservableLongGauge;
 
-    public static final String LABEL_CLUSTER_NAME = "cluster";
-    public static final String LABEL_NODE_TYPE = "node_type";
-    public static final String NODE_TYPE_BROKER = "broker";
-    public static final String LABEL_NODE_ID = "node_id";
-    public static final String LABEL_AGGREGATION = "aggregation";
-    public static final String AGGREGATION_DELTA = "delta";
+public class NopObservableLongGauge implements ObservableLongGauge {
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 2d15139d4..76e1d8b63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -21,13 +21,19 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -43,13 +49,13 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 
-import java.nio.ByteBuffer;
-import java.util.List;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
 
 public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
 
@@ -87,6 +93,16 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
 
                 this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
 
+                if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+                    Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                        .put(LABEL_TOPIC, requestHeader.getTopic())
+                        .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+                        .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+                        .build();
+                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+                }
+
                 if (!channelIsWritable(channel, requestHeader)) {
                     getMessageResult.release();
                     //ignore pull request
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 7cc15f210..2bce7a581 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -22,13 +22,16 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
 
+import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Random;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -38,6 +41,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PeekMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -48,6 +52,10 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class PeekMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -173,6 +181,17 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
                     getMessageResult.getBufferTotalSize());
 
                 this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+
+                if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+                    Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                        .put(LABEL_TOPIC, requestHeader.getTopic())
+                        .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+                        .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+                        .build();
+                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+                }
+
                 if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                     final long beginTimeMills = this.brokerController.getMessageStore().now();
                     final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 1a5401b2c..0b8801ebf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -37,8 +38,10 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.longpolling.PopRequest;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
@@ -56,6 +59,7 @@ import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -71,6 +75,10 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class PopMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger POP_LOGGER =
         InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -540,6 +548,16 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                 this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
                     getMessageTmpResult.getBufferTotalSize());
 
+                if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+                    Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                        .put(LABEL_TOPIC, requestHeader.getTopic())
+                        .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+                        .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+                        .build();
+                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+                }
+
                 if (isOrder) {
                     this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
                         requestHeader.getConsumerGroup(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index a992a47da..96ea64aa9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
+import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -38,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -48,6 +51,10 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class PopReviveService extends ServiceThread {
     private static final InternalLogger POP_LOGGER = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
 
@@ -198,6 +205,15 @@ public class PopReviveService extends ServiceThread {
                     brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                     brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
                         brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp());
+
+                    Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                        .put(LABEL_TOPIC, topic)
+                        .put(LABEL_CONSUMER_GROUP, group)
+                        .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
+                        .build();
+                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+
                     break;
                 case NO_MATCHED_MESSAGE:
                     pullStatus = PullStatus.NO_MATCHED_MSG;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index da4d8db1f..d569b195f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -19,10 +19,13 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -35,6 +38,7 @@ import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -47,6 +51,10 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -147,7 +155,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
 
         if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
             PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
-            this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
+            this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt, BrokerMetricsManager.getMessageType(requestHeader));
         }
 
         return response;
@@ -237,7 +245,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
     private void handlePutMessageResult(PutMessageResult putMessageResult,
         final RemotingCommand request, final MessageExt msg,
         final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
-        int queueIdInt) {
+        int queueIdInt, TopicMessageType messageType) {
         if (putMessageResult == null) {
             log.warn("process reply message, store putMessage return null");
             return;
@@ -290,6 +298,17 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
                 putMessageResult.getAppendMessageResult().getWroteBytes());
             this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
 
+            if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
+                Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_TOPIC, msg.getTopic())
+                    .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
+                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
+                    .build();
+                BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+                BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
+                BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+            }
+
             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
             responseHeader.setQueueId(queueIdInt);
             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 267a97b5a..9c5c1e937 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
@@ -25,9 +26,11 @@ import java.util.concurrent.CompletableFuture;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.common.attribute.CleanupPolicy;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.MQVersion;
@@ -65,6 +68,9 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@@ -299,7 +305,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
                 RemotingCommand responseFuture =
                     handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
-                        ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
+                        ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
                 if (responseFuture != null) {
                     doResponse(ctx, request, responseFuture);
                 }
@@ -314,16 +320,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             } else {
                 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
             }
-            handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
+            handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
             sendMessageCallback.onComplete(sendMessageContext, response);
             return response;
         }
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
-        RemotingCommand request, MessageExt msg,
-        SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
-        int queueIdInt, long beginTimeMillis, TopicQueueMappingContext mappingContext) {
+        RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader,
+        SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt, long beginTimeMillis,
+        TopicQueueMappingContext mappingContext, TopicMessageType messageType) {
         if (putMessageResult == null) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("store putMessage return null");
@@ -422,6 +428,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt,
                 (int) (this.brokerController.getMessageStore().now() - beginTimeMillis));
 
+            if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
+                Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_TOPIC, msg.getTopic())
+                    .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
+                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
+                    .build();
+                BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+                BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
+                BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+            }
+
             response.setRemark(null);
 
             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
@@ -571,7 +588,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
                 RemotingCommand responseFuture =
                     handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader,
-                        sendMessageContext, ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
+                        sendMessageContext, ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
                 if (responseFuture != null) {
                     doResponse(ctx, request, responseFuture);
                 }
@@ -586,7 +603,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             } else {
                 putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
             }
-            handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
+            handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader,
+                sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
             sendMessageCallback.onComplete(sendMessageContext, response);
             return response;
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index a7dfbd1e9..9a8721b04 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.schedule;
 
+import io.opentelemetry.api.common.Attributes;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -34,11 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.topic.TopicValidator;
@@ -57,6 +60,11 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class ScheduleMessageService extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -740,9 +748,27 @@ public class ScheduleMessageService extends ConfigManager {
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+
+                Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_TOPIC, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
+                    .put(LABEL_CONSUMER_GROUP, MixAll.SCHEDULE_CONSUMER_GROUP)
+                    .put(LABEL_IS_SYSTEM, true)
+                    .build();
+                BrokerMetricsManager.messagesOutTotal.add(result.getAppendMessageResult().getMsgNum(), attributes);
+                BrokerMetricsManager.throughputOutTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes);
+
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+
+                attributes = BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_TOPIC, topic)
+                    .put(LABEL_MESSAGE_TYPE, TopicMessageType.DELAY.getMetricsValue())
+                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
+                    .build();
+                BrokerMetricsManager.messagesInTotal.add(result.getAppendMessageResult().getMsgNum(), attributes);
+                BrokerMetricsManager.throughputInTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes);
+                BrokerMetricsManager.messageSize.record(result.getAppendMessageResult().getWroteBytes() / result.getAppendMessageResult().getMsgNum(), attributes);
             }
         }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index a3bda9b59..1a5ebcf8e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -16,9 +16,12 @@
  */
 package org.apache.rocketmq.broker.transaction.queue;
 
+import io.opentelemetry.api.common.Attributes;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -31,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InnerLoggerFactory;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -50,6 +54,10 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public class TransactionalMessageBridge {
     private static final InternalLogger LOGGER = InnerLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
@@ -138,6 +146,15 @@ public class TransactionalMessageBridge {
                     this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
                         this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
                             .getStoreTimestamp());
+
+                    Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                        .put(LABEL_TOPIC, topic)
+                        .put(LABEL_CONSUMER_GROUP, group)
+                        .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
+                        .build();
+                    BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+                    BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+
                     break;
                 case NO_MATCHED_MESSAGE:
                     pullStatus = PullStatus.NO_MATCHED_MSG;
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 5a091aeb2..8c484da31 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -39,4 +39,8 @@ public enum TopicMessageType {
     public String getValue() {
         return value;
     }
+
+    public String getMetricsValue() {
+        return value.toLowerCase();
+    }
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
index 2af8a708c..c8d6da2d3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
@@ -82,7 +82,7 @@ public class PopBigMessageIT extends BasePopNormally {
         });
 
         // no ack, msg will put into pop retry topic
-        await().atMost(Duration.ofSeconds(6)).untilAsserted(() -> {
+        await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
             PopResult retryPopResult = popMessageAsync(Duration.ofSeconds(3).toMillis(), 1, 5000).get();
             assertEquals(PopStatus.FOUND, retryPopResult.getPopStatus());