You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/02 12:42:48 UTC
[rocketmq] branch develop updated: implement broker stats metrics and request metrics (#5449)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 350e433ac implement broker stats metrics and request metrics (#5449)
350e433ac is described below
commit 350e433acc19a7ad4e69960063b4c694de9e82c5
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Nov 2 20:42:42 2022 +0800
implement broker stats metrics and request metrics (#5449)
* implement broker stats metrics and request metrics
* fix bazel
* PopBigMessageIT potentially needs more time
* Fix bazel deps warning
Co-authored-by: Li Zhanhui <li...@gmail.com>
---
broker/BUILD.bazel | 1 +
.../apache/rocketmq/broker/BrokerController.java | 20 ++++
.../broker/metrics/BrokerMetricsConstant.java | 15 +++
.../broker/metrics/BrokerMetricsManager.java | 117 ++++++++++++++++++++-
...kerMetricsConstant.java => NopLongCounter.java} | 24 +++--
...rMetricsConstant.java => NopLongHistogram.java} | 24 +++--
...ricsConstant.java => NopLongUpDownCounter.java} | 24 +++--
...csConstant.java => NopObservableLongGauge.java} | 11 +-
.../processor/DefaultPullMessageResultHandler.java | 22 +++-
.../broker/processor/PeekMessageProcessor.java | 19 ++++
.../broker/processor/PopMessageProcessor.java | 18 ++++
.../broker/processor/PopReviveService.java | 16 +++
.../broker/processor/ReplyMessageProcessor.java | 23 +++-
.../broker/processor/SendMessageProcessor.java | 32 ++++--
.../broker/schedule/ScheduleMessageService.java | 26 +++++
.../queue/TransactionalMessageBridge.java | 17 +++
.../common/attribute/TopicMessageType.java | 4 +
.../test/client/consumer/pop/PopBigMessageIT.java | 2 +-
18 files changed, 365 insertions(+), 50 deletions(-)
diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index a537520f8..ea8dd9046 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -41,6 +41,7 @@ java_library(
"@maven//:io_netty_netty_all",
"@maven//:io_openmessaging_storage_dledger",
"@maven//:io_opentelemetry_opentelemetry_api",
+ "@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_sdk",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d5041c2a7..e35fc7dc1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2283,4 +2283,24 @@ public class BrokerController {
return this.topicRouteInfoManager;
}
+ public BlockingQueue<Runnable> getClientManagerThreadPoolQueue() {
+ return clientManagerThreadPoolQueue;
+ }
+
+ public BlockingQueue<Runnable> getConsumerManagerThreadPoolQueue() {
+ return consumerManagerThreadPoolQueue;
+ }
+
+ public BlockingQueue<Runnable> getAsyncPutThreadPoolQueue() {
+ return putThreadPoolQueue;
+ }
+
+ public BlockingQueue<Runnable> getReplyThreadPoolQueue() {
+ return replyThreadPoolQueue;
+ }
+
+ public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
+ return adminBrokerThreadPoolQueue;
+ }
+
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 48c73b1b8..d13516551 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -18,12 +18,27 @@ package org.apache.rocketmq.broker.metrics;
public class BrokerMetricsConstant {
public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
+
+ public static final String GAUGE_PROCESSOR_WATERMARK = "rocketmq_processor_watermark";
public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+ public static final String COUNTER_MESSAGES_IN_TOTAL = "rocketmq_messages_in_total";
+ public static final String COUNTER_MESSAGES_OUT_TOTAL = "rocketmq_messages_out_total";
+ public static final String COUNTER_THROUGHPUT_IN_TOTAL = "rocketmq_throughput_in_total";
+ public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "rocketmq_throughput_out_total";
+ public static final String HISTOGRAM_MESSAGE_SIZE = "rocketmq_message_size";
+
public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
public static final String NODE_TYPE_BROKER = "broker";
public static final String LABEL_NODE_ID = "node_id";
public static final String LABEL_AGGREGATION = "aggregation";
public static final String AGGREGATION_DELTA = "delta";
+ public static final String LABEL_PROCESSOR = "processor";
+
+ public static final String LABEL_TOPIC = "topic";
+ public static final String LABEL_IS_RETRY = "is_retry";
+ public static final String LABEL_IS_SYSTEM = "is_system";
+ public static final String LABEL_CONSUMER_GROUP = "consumer_group";
+ public static final String LABEL_MESSAGE_TYPE = "message_type";
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index e6fab145f..d18a1eb25 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -19,18 +19,24 @@ package org.apache.rocketmq.broker.metrics;
import com.google.common.base.Splitter;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,12 +44,24 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.MessageStore;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
@@ -64,7 +82,16 @@ public class BrokerMetricsManager {
private PrometheusHttpServer prometheusHttpServer;
private Meter brokerMeter;
- public static ObservableLongGauge brokerPermission = null;
+ // broker stats metrics
+ public static ObservableLongGauge processorWatermark = new NopObservableLongGauge();
+ public static ObservableLongGauge brokerPermission = new NopObservableLongGauge();
+
+ // request metrics
+ public static LongCounter messagesInTotal = new NopLongCounter();
+ public static LongCounter messagesOutTotal = new NopLongCounter();
+ public static LongCounter throughputInTotal = new NopLongCounter();
+ public static LongCounter throughputOutTotal = new NopLongCounter();
+ public static LongHistogram messageSize = new NopLongHistogram();
public BrokerMetricsManager(BrokerController brokerController) {
this.brokerController = brokerController;
@@ -79,6 +106,30 @@ public class BrokerMetricsManager {
return attributesBuilder;
}
+ public static boolean isRetryOrDlqTopic(String topic) {
+ if (StringUtils.isBlank(topic)) {
+ return false;
+ }
+ return topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
+ }
+
+ public static TopicMessageType getMessageType(SendMessageRequestHeader requestHeader) {
+ Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
+ String traFlag = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ TopicMessageType topicMessageType = TopicMessageType.NORMAL;
+ if (Boolean.parseBoolean(traFlag)) {
+ topicMessageType = TopicMessageType.TRANSACTION;
+ } else if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
+ topicMessageType = TopicMessageType.FIFO;
+ } else if (properties.get("__STARTDELIVERTIME") != null
+ || properties.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null
+ || properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null
+ || properties.get(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
+ topicMessageType = TopicMessageType.DELAY;
+ }
+ return topicMessageType;
+ }
+
public Meter getBrokerMeter() {
return brokerMeter;
}
@@ -181,21 +232,85 @@ public class BrokerMetricsManager {
providerBuilder.registerMetricReader(prometheusHttpServer);
}
+ registerMetricsView(providerBuilder);
+
brokerMeter = OpenTelemetrySdk.builder()
.setMeterProvider(providerBuilder.build())
.build()
.getMeter(OPEN_TELEMETRY_METER_NAME);
initStatsMetrics();
+ initRequestMetrics();
+ }
+
+ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+ // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M
+ List<Double> messageSizeBuckets = Arrays.asList(
+ 1d * 1024, //1KB
+ 4d * 1024, //4KB
+ 512d * 1024, //512KB
+ 1d * 1024 * 1024, //1MB
+ 2d * 1024 * 1024, //2MB
+ 4d * 1024 * 1024 //4MB
+ );
+ InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_MESSAGE_SIZE)
+ .build();
+ View messageSizeView = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
+ .build();
+ providerBuilder.registerView(messageSizeSelector, messageSizeView);
}
private void initStatsMetrics() {
+ processorWatermark = brokerMeter.gaugeBuilder(GAUGE_PROCESSOR_WATERMARK)
+ .setDescription("Request processor watermark")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "send").build());
+ measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "async_put").build());
+ measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "pull").build());
+ measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "ack").build());
+ measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "query_message").build());
+ measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "client_manager").build());
+ measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "heartbeat").build());
+ measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "lite_pull").build());
+ measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "transaction").build());
+ measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "consumer_manager").build());
+ measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "admin").build());
+ measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "reply").build());
+ });
+
brokerPermission = brokerMeter.gaugeBuilder(GAUGE_BROKER_PERMISSION)
.setDescription("Broker permission")
.ofLongs()
.buildWithCallback(measurement -> measurement.record(brokerConfig.getBrokerPermission(), newAttributesBuilder().build()));
}
+ private void initRequestMetrics() {
+ messagesInTotal = brokerMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL)
+ .setDescription("Total number of incoming messages")
+ .build();
+
+ messagesOutTotal = brokerMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL)
+ .setDescription("Total number of outgoing messages")
+ .build();
+
+ throughputInTotal = brokerMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL)
+ .setDescription("Total traffic of incoming messages")
+ .build();
+
+ throughputOutTotal = brokerMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL)
+ .setDescription("Total traffic of outgoing messages")
+ .build();
+
+ messageSize = brokerMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE)
+ .setDescription("Incoming messages size")
+ .ofLongs()
+ .build();
+ }
+
public void shutdown() {
if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
index 48c73b1b8..5f9c558e0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
@@ -16,14 +16,20 @@
*/
package org.apache.rocketmq.broker.metrics;
-public class BrokerMetricsConstant {
- public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
- public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.context.Context;
- public static final String LABEL_CLUSTER_NAME = "cluster";
- public static final String LABEL_NODE_TYPE = "node_type";
- public static final String NODE_TYPE_BROKER = "broker";
- public static final String LABEL_NODE_ID = "node_id";
- public static final String LABEL_AGGREGATION = "aggregation";
- public static final String AGGREGATION_DELTA = "delta";
+public class NopLongCounter implements LongCounter {
+ @Override public void add(long l) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes, Context context) {
+
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
index 48c73b1b8..582b20daa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
@@ -16,14 +16,20 @@
*/
package org.apache.rocketmq.broker.metrics;
-public class BrokerMetricsConstant {
- public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
- public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.context.Context;
- public static final String LABEL_CLUSTER_NAME = "cluster";
- public static final String LABEL_NODE_TYPE = "node_type";
- public static final String NODE_TYPE_BROKER = "broker";
- public static final String LABEL_NODE_ID = "node_id";
- public static final String LABEL_AGGREGATION = "aggregation";
- public static final String AGGREGATION_DELTA = "delta";
+public class NopLongHistogram implements LongHistogram {
+ @Override public void record(long l) {
+
+ }
+
+ @Override public void record(long l, Attributes attributes) {
+
+ }
+
+ @Override public void record(long l, Attributes attributes, Context context) {
+
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
index 48c73b1b8..0752d57a1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
@@ -16,14 +16,20 @@
*/
package org.apache.rocketmq.broker.metrics;
-public class BrokerMetricsConstant {
- public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
- public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.context.Context;
- public static final String LABEL_CLUSTER_NAME = "cluster";
- public static final String LABEL_NODE_TYPE = "node_type";
- public static final String NODE_TYPE_BROKER = "broker";
- public static final String LABEL_NODE_ID = "node_id";
- public static final String LABEL_AGGREGATION = "aggregation";
- public static final String AGGREGATION_DELTA = "delta";
+public class NopLongUpDownCounter implements LongUpDownCounter {
+ @Override public void add(long l) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes, Context context) {
+
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
similarity index 59%
copy from broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
copy to broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
index 48c73b1b8..442f3697e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
@@ -16,14 +16,7 @@
*/
package org.apache.rocketmq.broker.metrics;
-public class BrokerMetricsConstant {
- public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
- public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
+import io.opentelemetry.api.metrics.ObservableLongGauge;
- public static final String LABEL_CLUSTER_NAME = "cluster";
- public static final String LABEL_NODE_TYPE = "node_type";
- public static final String NODE_TYPE_BROKER = "broker";
- public static final String LABEL_NODE_ID = "node_id";
- public static final String LABEL_AGGREGATION = "aggregation";
- public static final String AGGREGATION_DELTA = "delta";
+public class NopObservableLongGauge implements ObservableLongGauge {
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 2d15139d4..76e1d8b63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -21,13 +21,19 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -43,13 +49,13 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
-import java.nio.ByteBuffer;
-import java.util.List;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
@@ -87,6 +93,16 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+ if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+ }
+
if (!channelIsWritable(channel, requestHeader)) {
getMessageResult.release();
//ignore pull request
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 7cc15f210..2bce7a581 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -22,13 +22,16 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -38,6 +41,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PeekMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -48,6 +52,10 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class PeekMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -173,6 +181,17 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+
+ if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+ }
+
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 1a5401b2c..0b8801ebf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
@@ -37,8 +38,10 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PopRequest;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
@@ -56,6 +59,7 @@ import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -71,6 +75,10 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class PopMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger POP_LOGGER =
InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -540,6 +548,16 @@ public class PopMessageProcessor implements NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
getMessageTmpResult.getBufferTotalSize());
+ if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+ }
+
if (isOrder) {
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
requestHeader.getConsumerGroup(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index a992a47da..96ea64aa9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
+import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -25,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
@@ -38,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -48,6 +51,10 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class PopReviveService extends ServiceThread {
private static final InternalLogger POP_LOGGER = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -198,6 +205,15 @@ public class PopReviveService extends ServiceThread {
brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp());
+
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, topic)
+ .put(LABEL_CONSUMER_GROUP, group)
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+
break;
case NO_MATCHED_MESSAGE:
pullStatus = PullStatus.NO_MATCHED_MSG;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index da4d8db1f..d569b195f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -19,10 +19,13 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -35,6 +38,7 @@ import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -47,6 +51,10 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -147,7 +155,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
+ this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt, BrokerMetricsManager.getMessageType(requestHeader));
}
return response;
@@ -237,7 +245,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
private void handlePutMessageResult(PutMessageResult putMessageResult,
final RemotingCommand request, final MessageExt msg,
final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
- int queueIdInt) {
+ int queueIdInt, TopicMessageType messageType) {
if (putMessageResult == null) {
log.warn("process reply message, store putMessage return null");
return;
@@ -290,6 +298,17 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+ if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, msg.getTopic())
+ .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
+ .build();
+ BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+ BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
+ BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+ }
+
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 267a97b5a..9c5c1e937 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -25,9 +26,11 @@ import java.util.concurrent.CompletableFuture;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.MQVersion;
@@ -65,6 +68,9 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@@ -299,7 +305,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
RemotingCommand responseFuture =
handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
- ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
+ ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
@@ -314,16 +320,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
- handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
+ handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
- RemotingCommand request, MessageExt msg,
- SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
- int queueIdInt, long beginTimeMillis, TopicQueueMappingContext mappingContext) {
+ RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader,
+ SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt, long beginTimeMillis,
+ TopicQueueMappingContext mappingContext, TopicMessageType messageType) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
@@ -422,6 +428,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt,
(int) (this.brokerController.getMessageStore().now() - beginTimeMillis));
+ if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, msg.getTopic())
+ .put(LABEL_MESSAGE_TYPE, messageType.getMetricsValue())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(msg.getTopic()))
+ .build();
+ BrokerMetricsManager.messagesInTotal.add(putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+ BrokerMetricsManager.throughputInTotal.add(putMessageResult.getAppendMessageResult().getWroteBytes(), attributes);
+ BrokerMetricsManager.messageSize.record(putMessageResult.getAppendMessageResult().getWroteBytes() / putMessageResult.getAppendMessageResult().getMsgNum(), attributes);
+ }
+
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
@@ -571,7 +588,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
RemotingCommand responseFuture =
handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader,
- sendMessageContext, ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
+ sendMessageContext, ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
@@ -586,7 +603,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
}
- handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
+ handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader,
+ sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index a7dfbd1e9..9a8721b04 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.schedule;
+import io.opentelemetry.api.common.Attributes;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -34,11 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -57,6 +60,11 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -740,9 +748,27 @@ public class ScheduleMessageService extends ConfigManager {
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
+ .put(LABEL_CONSUMER_GROUP, MixAll.SCHEDULE_CONSUMER_GROUP)
+ .put(LABEL_IS_SYSTEM, true)
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(result.getAppendMessageResult().getMsgNum(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes);
+
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+
+ attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, topic)
+ .put(LABEL_MESSAGE_TYPE, TopicMessageType.DELAY.getMetricsValue())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
+ .build();
+ BrokerMetricsManager.messagesInTotal.add(result.getAppendMessageResult().getMsgNum(), attributes);
+ BrokerMetricsManager.throughputInTotal.add(result.getAppendMessageResult().getWroteBytes(), attributes);
+ BrokerMetricsManager.messageSize.record(result.getAppendMessageResult().getWroteBytes() / result.getAppendMessageResult().getMsgNum(), attributes);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index a3bda9b59..1a5ebcf8e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -16,9 +16,12 @@
*/
package org.apache.rocketmq.broker.transaction.queue;
+import io.opentelemetry.api.common.Attributes;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -31,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InnerLoggerFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -50,6 +54,10 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
public class TransactionalMessageBridge {
private static final InternalLogger LOGGER = InnerLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@@ -138,6 +146,15 @@ public class TransactionalMessageBridge {
this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
.getStoreTimestamp());
+
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, topic)
+ .put(LABEL_CONSUMER_GROUP, group)
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic) || MixAll.isSysConsumerGroup(group))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+
break;
case NO_MATCHED_MESSAGE:
pullStatus = PullStatus.NO_MATCHED_MSG;
diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 5a091aeb2..8c484da31 100644
--- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -39,4 +39,8 @@ public enum TopicMessageType {
public String getValue() {
return value;
}
+
+ public String getMetricsValue() {
+ return value.toLowerCase();
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
index 2af8a708c..c8d6da2d3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopBigMessageIT.java
@@ -82,7 +82,7 @@ public class PopBigMessageIT extends BasePopNormally {
});
// no ack, msg will put into pop retry topic
- await().atMost(Duration.ofSeconds(6)).untilAsserted(() -> {
+ await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
PopResult retryPopResult = popMessageAsync(Duration.ofSeconds(3).toMillis(), 1, 5000).get();
assertEquals(PopStatus.FOUND, retryPopResult.getPopStatus());