You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 11:44:47 UTC
[rocketmq] branch develop updated: [ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ae1bd2af [ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)
0ae1bd2af is described below
commit 0ae1bd2af69f475aba03ab7a27df0bd7cda15359
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Nov 9 19:44:41 2022 +0800
[ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)
* implement remoting metrics
* fix bazel
* add metrics view & fix unit test
* move remoting command process timer to NettyDecoder
---
.../broker/longpolling/PullRequestHoldService.java | 2 +-
.../broker/metrics/BrokerMetricsManager.java | 20 +++-
.../processor/AbstractSendMessageProcessor.java | 13 +--
.../broker/processor/AdminBrokerProcessor.java | 19 ++--
.../processor/DefaultPullMessageResultHandler.java | 63 +++++++------
.../broker/processor/NotificationProcessor.java | 36 ++------
.../broker/processor/PeekMessageProcessor.java | 23 +++--
.../broker/processor/PopMessageProcessor.java | 72 +++++++--------
.../broker/processor/PullMessageProcessor.java | 60 +++++-------
.../broker/processor/QueryMessageProcessor.java | 42 ++++++---
.../broker/processor/SendMessageProcessorTest.java | 47 ++++------
pom.xml | 5 +
remoting/BUILD.bazel | 26 ++++--
remoting/pom.xml | 4 +
.../main/java/org/apache/rocketmq/common/Pair.java | 0
.../rocketmq/common}/metrics/NopLongCounter.java | 2 +-
.../rocketmq/common}/metrics/NopLongHistogram.java | 2 +-
.../common}/metrics/NopLongUpDownCounter.java | 2 +-
.../common}/metrics/NopObservableLongGauge.java | 2 +-
.../rocketmq/common/protocol/RequestCode.java | 0
.../rocketmq/common/protocol/ResponseCode.java | 0
.../apache/rocketmq/remoting/RemotingServer.java | 2 +-
.../org/apache/rocketmq/remoting/common/Pair.java | 43 ---------
.../remoting/metrics/RemotingMetricsConstant.java | 69 ++++++++++++++
.../remoting/metrics/RemotingMetricsManager.java | 102 +++++++++++++++++++++
.../rocketmq/remoting/netty/NettyDecoder.java | 10 +-
.../remoting/netty/NettyRemotingAbstract.java | 94 ++++++++++++++-----
.../remoting/netty/NettyRemotingClient.java | 2 +-
.../remoting/netty/NettyRemotingServer.java | 2 +-
.../remoting/protocol/RemotingCommand.java | 41 ++++++---
30 files changed, 513 insertions(+), 292 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index f3cfc11e3..59b884324 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
@@ -53,6 +52,7 @@ public class PullRequestHoldService extends ServiceThread {
}
}
+ pullRequest.getRequestCommand().setSuspended(true);
mpr.addPullRequest(pullRequest);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index d18a1eb25..8840b45e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -45,13 +45,18 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.store.MessageStore;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
@@ -59,15 +64,15 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_M
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
public class BrokerMetricsManager {
@@ -241,6 +246,7 @@ public class BrokerMetricsManager {
initStatsMetrics();
initRequestMetrics();
+ initOtherMetrics();
}
private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
@@ -261,6 +267,10 @@ public class BrokerMetricsManager {
.setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
.build();
providerBuilder.registerView(messageSizeSelector, messageSizeView);
+
+ for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
+ providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ }
}
private void initStatsMetrics() {
@@ -311,6 +321,10 @@ public class BrokerMetricsManager {
.build();
}
+ private void initOtherMetrics() {
+ RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
+ }
+
public void shutdown() {
if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 01968bac0..1a87da322 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
@@ -58,6 +57,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.PutMessageResult;
@@ -524,16 +524,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
final RemotingCommand response) {
- if (!request.isOnewayRPC()) {
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- LOGGER.error(
- "SendMessageProcessor finished processing the request, but failed to send response, client "
- + "address={}, request={}, response={}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- request.toString(), response.toString(), e);
- }
- }
+ NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
}
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e0f876e17..db5cec6f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -42,15 +42,11 @@ import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
-import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
-import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
-import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
@@ -92,11 +88,13 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
@@ -109,6 +107,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.ExchangeHAInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.ExchangeHAInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
@@ -161,12 +160,14 @@ import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.GroupForbidden;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -178,10 +179,10 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
-import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -535,7 +536,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
- ctx.writeAndFlush(response);
+ NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
} else {
String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed.";
LOGGER.warn(errorMsg);
@@ -569,7 +570,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
- ctx.writeAndFlush(response);
+ NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
} else {
String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed.";
LOGGER.warn(errorMsg);
@@ -604,7 +605,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
- ctx.writeAndFlush(response);
+ NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
} else {
String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed.";
LOGGER.warn(errorMsg);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 76e1d8b63..fe8e7aa0e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -18,12 +18,12 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
@@ -56,6 +57,9 @@ import org.apache.rocketmq.store.config.BrokerRole;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
@@ -68,14 +72,14 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
@Override
public RemotingCommand handle(final GetMessageResult getMessageResult,
- final RemotingCommand request,
- final PullMessageRequestHeader requestHeader,
- final Channel channel,
- final SubscriptionData subscriptionData,
- final SubscriptionGroupConfig subscriptionGroupConfig,
- final boolean brokerAllowSuspend,
- final MessageFilter messageFilter,
- RemotingCommand response) {
+ final RemotingCommand request,
+ final PullMessageRequestHeader requestHeader,
+ final Channel channel,
+ final SubscriptionData subscriptionData,
+ final SubscriptionGroupConfig subscriptionGroupConfig,
+ final boolean brokerAllowSuspend,
+ final MessageFilter messageFilter,
+ RemotingCommand response) {
PullMessageProcessor processor = brokerController.getPullMessageProcessor();
processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
@@ -86,10 +90,10 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getMessageCount());
+ getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- getMessageResult.getBufferTotalSize());
+ getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
@@ -114,23 +118,27 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId(),
- (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+ requestHeader.getTopic(), requestHeader.getQueueId(),
+ (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
return response;
} else {
try {
FileRegion fileRegion =
- new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
- channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+ channel.writeAndFlush(fileRegion)
+ .addListener((ChannelFutureListener) future -> {
getMessageResult.release();
+ Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+ .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+ .build();
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
log.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
- }
- });
+ });
} catch (Throwable e) {
log.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
@@ -151,7 +159,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
- this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
+ this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
return null;
}
@@ -159,7 +167,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
- || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+ || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
@@ -171,15 +179,15 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
log.warn(
- "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
- responseHeader.getSuggestWhichBrokerId());
+ "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
- requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
- responseHeader.getSuggestWhichBrokerId());
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
}
break;
@@ -202,7 +210,8 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
return true;
}
- protected byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
+ protected byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group,
+ final String topic,
final int queueId) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 2b39cba5c..559fd61a0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -18,16 +18,12 @@ package org.apache.rocketmq.broker.processor;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
-
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.longpolling.NotificationRequest;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -44,6 +40,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -148,31 +145,18 @@ public class NotificationProcessor implements NettyRequestProcessor {
if (!request.getChannel().isActive()) {
return;
}
- Runnable run = new Runnable() {
- @Override
- public void run() {
- final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
-
- if (response != null) {
- response.setOpaque(request.getRemotingCommand().getOpaque());
- response.markResponseType();
- try {
- request.getChannel().writeAndFlush(response).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- POP_LOGGER.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
- POP_LOGGER.error(request.toString());
- POP_LOGGER.error(response.toString());
- }
- }
- });
- } catch (Throwable e) {
- POP_LOGGER.error("ProcessRequestWrapper process request over, but response failed", e);
+ Runnable run = () -> {
+ final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
+ if (response != null) {
+ response.setOpaque(request.getRemotingCommand().getOpaque());
+ response.markResponseType();
+ NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
+ if (!future.isSuccess()) {
+ POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
POP_LOGGER.error(request.toString());
POP_LOGGER.error(response.toString());
}
- }
+ });
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 2bce7a581..688d94c63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -17,16 +17,14 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
-
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
-
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
@@ -46,6 +44,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
@@ -55,6 +54,9 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
public class PeekMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -204,15 +206,20 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
- channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ RemotingCommand finalResponse = response;
+ channel.writeAndFlush(fileRegion)
+ .addListener((ChannelFutureListener) future -> {
tmpGetMessageResult.release();
+ Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(finalResponse.getCode()))
+ .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+ .build();
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
}
- }
- });
+ });
} catch (Throwable e) {
LOG.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index ac1dd4615..354e44157 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
@@ -31,6 +30,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
@@ -53,6 +53,7 @@ import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
@@ -65,12 +66,13 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -78,6 +80,9 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
public class PopMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger POP_LOGGER =
@@ -221,35 +226,22 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (!request.getChannel().isActive()) {
return false;
}
- Runnable run = new Runnable() {
- @Override
- public void run() {
- try {
- final RemotingCommand response = PopMessageProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
-
- if (response != null) {
- response.setOpaque(request.getRemotingCommand().getOpaque());
- response.markResponseType();
- try {
- request.getChannel().writeAndFlush(response).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- POP_LOGGER.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
- POP_LOGGER.error(request.toString());
- POP_LOGGER.error(response.toString());
- }
- }
- });
- } catch (Throwable e) {
- POP_LOGGER.error("ProcessRequestWrapper process request over, but response failed", e);
+ Runnable run = () -> {
+ try {
+ final RemotingCommand response = processRequest(request.getChannel(), request.getRemotingCommand());
+ if (response != null) {
+ response.setOpaque(request.getRemotingCommand().getOpaque());
+ response.markResponseType();
+ NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
+ if (!future.isSuccess()) {
+ POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
POP_LOGGER.error(request.toString());
POP_LOGGER.error(response.toString());
}
- }
- } catch (RemotingCommandException e1) {
- POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
+ });
}
+ } catch (RemotingCommandException e1) {
+ POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
@@ -462,16 +454,21 @@ public class PopMessageProcessor implements NettyRequestProcessor {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()),
getMessageResult);
- channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ RemotingCommand finalResponse = response;
+ channel.writeAndFlush(fileRegion)
+ .addListener((ChannelFutureListener) future -> {
tmpGetMessageResult.release();
+ Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(finalResponse.getCode()))
+ .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+ .build();
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
POP_LOGGER.error("Fail to transfer messages from page cache to {}",
channel.remoteAddress(), future.cause());
}
- }
- });
+ });
} catch (Throwable e) {
POP_LOGGER.error("Error occurred when transferring messages from page cache", e);
getMessageResult.release();
@@ -520,20 +517,20 @@ public class PopMessageProcessor implements NettyRequestProcessor {
return restNum;
}
getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup()
- , topic, queueId, offset,
- requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
+ , topic, queueId, offset,
+ requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
if (getMessageTmpResult == null) {
return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
// maybe store offset is not correct.
if (GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageTmpResult.getStatus())
- || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
- || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
+ || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
+ || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
// commit offset, because the offset is not correct
// If offset in store is greater than cq offset, it will cause duplicate messages,
// because offset in PopBuffer is not committed.
POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
- lockKey, offset, getMessageTmpResult.getNextBeginOffset());
+ lockKey, offset, getMessageTmpResult.getNextBeginOffset());
offset = getMessageTmpResult.getNextBeginOffset();
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
queueId, offset);
@@ -685,6 +682,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
}
}
if (queue.add(request)) {
+ remotingCommand.setSuspended(true);
totalPollingNum.incrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 14771535d..0454a2550 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -16,12 +16,9 @@
*/
package org.apache.rocketmq.broker.processor;
-import java.util.List;
-
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -64,6 +61,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -309,7 +307,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden",
- this.brokerController.getBrokerConfig().getBrokerIP1()));
+ this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
@@ -708,38 +706,30 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
}
- public void executeRequestWhenWakeup(final Channel channel,
- final RemotingCommand request) throws RemotingCommandException {
- Runnable run = new Runnable() {
- @Override
- public void run() {
- try {
- final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
-
- if (response != null) {
- response.setOpaque(request.getOpaque());
- response.markResponseType();
- try {
- channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- LOGGER.error("processRequestWrapper response to {} failed",
- future.channel().remoteAddress(), future.cause());
- LOGGER.error(request.toString());
- LOGGER.error(response.toString());
- }
- }
- });
- } catch (Throwable e) {
- LOGGER.error("processRequestWrapper process request over, but response failed", e);
- LOGGER.error(request.toString());
- LOGGER.error(response.toString());
- }
+ public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) {
+ Runnable run = () -> {
+ try {
+ final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
+
+ if (response != null) {
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ try {
+ NettyRemotingAbstract.writeResponse(channel, request, response, future -> {
+ if (!future.isSuccess()) {
+ LOGGER.error("processRequestWrapper response to {} failed", channel.remoteAddress(), future.cause());
+ LOGGER.error(request.toString());
+ LOGGER.error(response.toString());
+ }
+ });
+ } catch (Throwable e) {
+ LOGGER.error("processRequestWrapper process request over, but response failed", e);
+ LOGGER.error(request.toString());
+ LOGGER.error(response.toString());
}
- } catch (RemotingCommandException e1) {
- LOGGER.error("excuteRequestWhenWakeup run", e1);
}
+ } catch (RemotingCommandException e1) {
+ LOGGER.error("excuteRequestWhenWakeup run", e1);
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index 5897a35f8..9eebf1e21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -16,28 +16,34 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+
public class QueryMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -100,15 +106,20 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
FileRegion fileRegion =
new QueryMessageTransfer(response.encodeHeader(queryMessageResult
.getBufferTotalSize()), queryMessageResult);
- ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ ctx.channel()
+ .writeAndFlush(fileRegion)
+ .addListener((ChannelFutureListener) future -> {
queryMessageResult.release();
+ Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+ .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+ .build();
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
LOGGER.error("transfer query message by page cache failed, ", future.cause());
}
- }
- });
+ });
} catch (Throwable e) {
LOGGER.error("", e);
queryMessageResult.release();
@@ -140,15 +151,20 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
FileRegion fileRegion =
new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
selectMappedBufferResult);
- ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ ctx.channel()
+ .writeAndFlush(fileRegion)
+ .addListener((ChannelFutureListener) future -> {
selectMappedBufferResult.release();
+ Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+ .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+ .build();
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
if (!future.isSuccess()) {
LOGGER.error("Transfer one message from page cache failed, ", future.cause());
}
- }
- });
+ });
} catch (Throwable e) {
LOGGER.error("", e);
selectMappedBufferResult.release();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 082267089..d86efa47c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -18,7 +18,12 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
@@ -34,6 +39,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -46,7 +52,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
@@ -56,22 +61,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -79,6 +75,8 @@ public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor;
@Mock
private ChannelHandlerContext handlerContext;
+ @Mock
+ private Channel channel;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(), new MessageStoreConfig());
@@ -101,9 +99,8 @@ public class SendMessageProcessorTest {
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getPutMessageFutureExecutor()).thenReturn(Executors.newSingleThreadExecutor());
when(messageStore.now()).thenReturn(System.currentTimeMillis());
- Channel mockChannel = mock(Channel.class);
- when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
- when(handlerContext.channel()).thenReturn(mockChannel);
+ when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
+ when(handlerContext.channel()).thenReturn(channel);
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
sendMessageProcessor = new SendMessageProcessor(brokerController);
}
@@ -220,13 +217,10 @@ public class SendMessageProcessorTest {
.thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1];
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- response[0] = invocation.getArgument(0);
- return null;
- }
- }).when(handlerContext).writeAndFlush(any(Object.class));
+ doAnswer(invocation -> {
+ response[0] = invocation.getArgument(0);
+ return null;
+ }).when(channel).writeAndFlush(any(Object.class));
await().atMost(Duration.ofSeconds(10)).until(() -> {
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
if (responseToReturn != null) {
@@ -366,13 +360,10 @@ public class SendMessageProcessorTest {
private void assertPutResult(int responseCode) throws RemotingCommandException {
final RemotingCommand request = createSendMsgCommand(RequestCode.SEND_MESSAGE);
final RemotingCommand[] response = new RemotingCommand[1];
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- response[0] = invocation.getArgument(0);
- return null;
- }
- }).when(handlerContext).writeAndFlush(any(Object.class));
+ doAnswer(invocation -> {
+ response[0] = invocation.getArgument(0);
+ return null;
+ }).when(channel).writeAndFlush(any(Object.class));
await().atMost(Duration.ofSeconds(10)).until(() -> {
RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
if (responseToReturn != null) {
@@ -388,4 +379,4 @@ public class SendMessageProcessorTest {
return true;
});
}
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
index 879821c5a..cc6baa443 100644
--- a/pom.xml
+++ b/pom.xml
@@ -937,6 +937,11 @@
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel
index 580b95080..327c51f20 100644
--- a/remoting/BUILD.bazel
+++ b/remoting/BUILD.bazel
@@ -23,12 +23,19 @@ java_library(
deps = [
"//logging",
"@maven//:com_alibaba_fastjson",
+ "@maven//:com_google_guava_guava",
+ "@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:com_squareup_okio_okio_jvm",
"@maven//:io_netty_netty_all",
- "@maven//:org_apache_commons_commons_lang3",
+ "@maven//:io_opentelemetry_opentelemetry_api",
+ "@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_sdk",
- "@maven//:com_squareup_okio_okio_jvm",
+ "@maven//:io_opentelemetry_opentelemetry_sdk_common",
+ "@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+ "@maven//:javax_annotation_javax_annotation_api",
+ "@maven//:org_apache_commons_commons_lang3",
],
)
@@ -39,14 +46,21 @@ java_library(
deps = [
":remoting",
"//:test_deps",
- "@maven//:io_netty_netty_all",
- "@maven//:com_google_code_gson_gson",
"@maven//:com_alibaba_fastjson",
- "@maven//:org_apache_commons_commons_lang3",
+ "@maven//:com_google_code_gson_gson",
+ "@maven//:com_google_guava_guava",
+ "@maven//:com_google_code_findbugs_jsr305",
+ "@maven//:com_squareup_okio_okio_jvm",
+ "@maven//:io_netty_netty_all",
+ "@maven//:io_opentelemetry_opentelemetry_api",
+ "@maven//:io_opentelemetry_opentelemetry_context",
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_sdk",
- "@maven//:com_squareup_okio_okio_jvm",
+ "@maven//:io_opentelemetry_opentelemetry_sdk_common",
+ "@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+ "@maven//:javax_annotation_javax_annotation_api",
+ "@maven//:org_apache_commons_commons_lang3",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
diff --git a/remoting/pom.xml b/remoting/pom.xml
index b471f67c2..397a72748 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -78,5 +78,9 @@
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/common/src/main/java/org/apache/rocketmq/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/common/Pair.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/Pair.java
rename to remoting/src/main/java/org/apache/rocketmq/common/Pair.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
index 5f9c558e0..a281216ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
index 582b20daa..e967c63f2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
index 0752d57a1..3e8be1976 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongUpDownCounter;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
similarity index 95%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
index 442f3697e..091fa72de 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
import io.opentelemetry.api.metrics.ObservableLongGauge;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
rename to remoting/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
rename to remoting/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 36e2035dc..8cfa1e1a0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.remoting;
import io.netty.channel.Channel;
import java.util.concurrent.ExecutorService;
-import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
deleted file mode 100644
index 01e761fcf..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting.common;
-
-public class Pair<T1, T2> {
- private T1 object1;
- private T2 object2;
-
- public Pair(T1 object1, T2 object2) {
- this.object1 = object1;
- this.object2 = object2;
- }
-
- public T1 getObject1() {
- return object1;
- }
-
- public void setObject1(T1 object1) {
- this.object1 = object1;
- }
-
- public T2 getObject2() {
- return object2;
- }
-
- public void setObject2(T2 object2) {
- this.object2 = object2;
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
new file mode 100644
index 000000000..242172ccf
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.metrics;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+public class RemotingMetricsConstant {
+ public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
+
+ public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
+ public static final String LABEL_REQUEST_CODE = "request_code";
+ public static final String LABEL_RESPONSE_CODE = "response_code";
+ public static final String LABEL_IS_LONG_POLLING = "is_long_polling";
+ public static final String LABEL_RESULT = "result";
+
+ public static final String PROTOCOL_TYPE_REMOTING = "remoting";
+
+ public static final String RESULT_ONEWAY = "oneway";
+ public static final String RESULT_SUCCESS = "success";
+ public static final String RESULT_CANCELED = "cancelled";
+ public static final String RESULT_PROCESS_REQUEST_FAILED = "process_request_failed";
+ public static final String RESULT_WRITE_CHANNEL_FAILED = "write_channel_failed";
+
+ public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() {
+ {
+ try {
+ Field[] f = RequestCode.class.getFields();
+ for (Field field : f) {
+ if (field.getType() == int.class) {
+ put((int) field.get(null), field.getName().toLowerCase());
+ }
+ }
+ } catch (IllegalAccessException ignore) {
+ }
+ }
+ };
+
+ public static final Map<Integer, String> RESPONSE_CODE_MAP = new HashMap<Integer, String>() {
+ {
+ try {
+ Field[] f = ResponseCode.class.getFields();
+ for (Field field : f) {
+ if (field.getType() == int.class) {
+ put((int) field.get(null), field.getName().toLowerCase());
+ }
+ }
+ } catch (IllegalAccessException ignore) {
+ }
+ }
+ };
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
new file mode 100644
index 000000000..4ca1f033f
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.metrics;
+
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Future;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.REQUEST_CODE_MAP;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESPONSE_CODE_MAP;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
+
+public class RemotingMetricsManager {
+ public static LongHistogram rpcLatency = new NopLongHistogram();
+ public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+
+ public static AttributesBuilder newAttributesBuilder() {
+ if (attributesBuilderSupplier == null) {
+ return Attributes.builder();
+ }
+ return attributesBuilderSupplier.get()
+ .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING);
+ }
+
+ public static void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
+ RemotingMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+ rpcLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY)
+ .setDescription("Rpc latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+ }
+
+ public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ List<Double> rpcCostTimeBuckets = Arrays.asList(
+ (double) Duration.ofMillis(1).toMillis(),
+ (double) Duration.ofMillis(10).toMillis(),
+ (double) Duration.ofMillis(100).toMillis(),
+ (double) Duration.ofSeconds(1).toMillis(),
+ (double) Duration.ofSeconds(2).toMillis(),
+ (double) Duration.ofSeconds(3).toMillis()
+ );
+ InstrumentSelector selector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_RPC_LATENCY)
+ .build();
+ View view = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+ .build();
+ return Lists.newArrayList(new Pair<>(selector, view));
+ }
+
+ public static String getWriteAndFlushResult(Future<?> future) {
+ String result = RESULT_SUCCESS;
+ if (future.isCancelled()) {
+ result = RESULT_CANCELED;
+ } else if (!future.isSuccess()) {
+ result = RESULT_WRITE_CHANNEL_FAILED;
+ }
+ return result;
+ }
+
+ public static String getRequestCodeDesc(int code) {
+ return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
+ }
+
+ public static String getResponseCodeDesc(int code) {
+ return RESPONSE_CODE_MAP.getOrDefault(code, String.valueOf(code));
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index 0fcb00e04..d4834f4b2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.remoting.netty;
+import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
@@ -38,12 +39,15 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
+ Stopwatch timer = Stopwatch.createStarted();
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
- return RemotingCommand.decode(frame);
+ RemotingCommand cmd = RemotingCommand.decode(frame);
+ cmd.setProcessTimer(timer);
+ return cmd;
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b287387a7..ebc8956d9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -21,6 +21,8 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.opentelemetry.api.common.AttributesBuilder;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,22 +37,32 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
+
public abstract class NettyRemotingAbstract {
/**
@@ -178,6 +190,49 @@ public abstract class NettyRemotingAbstract {
}
}
+ public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response) {
+ writeResponse(channel, request, response, null);
+ }
+
+ public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response, Consumer<Future<?>> callback) {
+ if (response == null) {
+ return;
+ }
+ AttributesBuilder attributesBuilder = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_IS_LONG_POLLING, request.isSuspended())
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+ .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()));
+ if (request.isOnewayRPC()) {
+ attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return;
+ }
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ try {
+ channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}",
+ request.getCode(), response.getCode(), response.getOpaque(), channel);
+ } else {
+ log.error("Failed to write response[request code: {}, response code: {}, opaque: {}] to channel{}",
+ request.getCode(), response.getCode(), response.getOpaque(), channel, future.cause());
+ }
+ attributesBuilder.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future));
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ if (callback != null) {
+ callback.accept(future);
+ }
+ });
+ } catch (Throwable e) {
+ log.error("process request over, but response failed", e);
+ log.error(request.toString());
+ log.error(response.toString());
+ attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
+ RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ }
+ }
+
/**
* Process incoming request command issued by remote peer.
*
@@ -194,7 +249,7 @@ public abstract class NettyRemotingAbstract {
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
- ctx.writeAndFlush(response);
+ writeResponse(ctx.channel(), cmd, response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
return;
}
@@ -205,7 +260,7 @@ public abstract class NettyRemotingAbstract {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
- ctx.writeAndFlush(response);
+ writeResponse(ctx.channel(), cmd, response);
return;
}
@@ -221,12 +276,15 @@ public abstract class NettyRemotingAbstract {
+ " request code: " + cmd.getCode());
}
- if (!cmd.isOnewayRPC()) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[OVERLOAD]system busy, start flow control for a while");
- response.setOpaque(opaque);
- ctx.writeAndFlush(response);
- }
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+ "[OVERLOAD]system busy, start flow control for a while");
+ response.setOpaque(opaque);
+ writeResponse(ctx.channel(), cmd, response);
+ } catch (Throwable e) {
+ AttributesBuilder attributesBuilder = RemotingMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(cmd.getCode()))
+ .put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED);
+ RemotingMetricsManager.rpcLatency.record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
}
}
@@ -259,19 +317,7 @@ public abstract class NettyRemotingAbstract {
throw exception;
}
- if (!cmd.isOnewayRPC()) {
- if (response != null) {
- response.setOpaque(opaque);
- response.markResponseType();
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("process request over, but response failed", e);
- log.error(cmd.toString());
- log.error(response.toString());
- }
- }
- }
+ writeResponse(ctx.channel(), cmd, response);
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
@@ -280,7 +326,7 @@ public abstract class NettyRemotingAbstract {
response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
- ctx.writeAndFlush(response);
+ writeResponse(ctx.channel(), cmd, response);
}
}
};
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 2d82fbe6c..def94ad8e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -68,12 +68,12 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 39be96c52..0fd8943b5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -55,12 +55,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingServer;
-import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 68ccdf69d..a3815aeca 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,13 +17,9 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
@@ -36,9 +32,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
@@ -90,6 +89,8 @@ public class RemotingCommand {
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
+ private boolean suspended;
+ private Stopwatch processTimer;
protected RemotingCommand() {
}
@@ -587,6 +588,16 @@ public class RemotingCommand {
this.body = body;
}
+ @JSONField(serialize = false)
+ public boolean isSuspended() {
+ return suspended;
+ }
+
+ @JSONField(serialize = false)
+ public void setSuspended(boolean suspended) {
+ this.suspended = suspended;
+ }
+
public HashMap<String, String> getExtFields() {
return extFields;
}
@@ -616,4 +627,12 @@ public class RemotingCommand {
public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
-}
\ No newline at end of file
+
+ public Stopwatch getProcessTimer() {
+ return processTimer;
+ }
+
+ public void setProcessTimer(Stopwatch processTimer) {
+ this.processTimer = processTimer;
+ }
+}