You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by fu...@apache.org on 2022/11/23 09:14:26 UTC
[rocketmq] branch develop updated: optimize metrics in pop processor (#5580)
This is an automated email from the ASF dual-hosted git repository.
fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a9e1bcbe5 optimize metrics in pop processor (#5580)
a9e1bcbe5 is described below
commit a9e1bcbe5b2d7e78bc7598d7139b6876d8502e72
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Nov 23 17:14:03 2022 +0800
optimize metrics in pop processor (#5580)
---
.../broker/processor/PeekMessageProcessor.java | 20 ++++++++++----------
.../broker/processor/PopMessageProcessor.java | 2 +-
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 020ee194d..12036666b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -184,16 +184,6 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
- if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
- Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
- .put(LABEL_TOPIC, requestHeader.getTopic())
- .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
- .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
- .build();
- BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
- BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
- }
-
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
@@ -253,6 +243,16 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), null);
}
if (getMessageTmpResult != null) {
+ if (!getMessageTmpResult.getMessageMapedList().isEmpty() && !isRetry) {
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, requestHeader.getTopic())
+ .put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(requestHeader.getTopic()) || MixAll.isSysConsumerGroup(requestHeader.getConsumerGroup()))
+ .build();
+ BrokerMetricsManager.messagesOutTotal.add(getMessageResult.getMessageCount(), attributes);
+ BrokerMetricsManager.throughputOutTotal.add(getMessageResult.getBufferTotalSize(), attributes);
+ }
+
for (SelectMappedBufferResult mapedBuffer : getMessageTmpResult.getMessageMapedList()) {
getMessageResult.addMessage(mapedBuffer);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 481fdcab7..25beddb6d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -548,7 +548,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), topic,
getMessageTmpResult.getBufferTotalSize());
- if (!BrokerMetricsManager.isRetryOrDlqTopic(requestHeader.getTopic())) {
+ if (!isRetry) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, requestHeader.getTopic())
.put(LABEL_CONSUMER_GROUP, requestHeader.getConsumerGroup())