You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/09 11:44:47 UTC

[rocketmq] branch develop updated: [ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ae1bd2af [ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)
0ae1bd2af is described below

commit 0ae1bd2af69f475aba03ab7a27df0bd7cda15359
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Nov 9 19:44:41 2022 +0800

    [ISSUE #5449] [RIP-46] implement remoting stats metrics (#5487)
    
    * implement remoting metrics
    
    * fix bazel
    
    * add metrics view & fix unit test
    
    * move remoting command process timer to NettyDecoder
---
 .../broker/longpolling/PullRequestHoldService.java |   2 +-
 .../broker/metrics/BrokerMetricsManager.java       |  20 +++-
 .../processor/AbstractSendMessageProcessor.java    |  13 +--
 .../broker/processor/AdminBrokerProcessor.java     |  19 ++--
 .../processor/DefaultPullMessageResultHandler.java |  63 +++++++------
 .../broker/processor/NotificationProcessor.java    |  36 ++------
 .../broker/processor/PeekMessageProcessor.java     |  23 +++--
 .../broker/processor/PopMessageProcessor.java      |  72 +++++++--------
 .../broker/processor/PullMessageProcessor.java     |  60 +++++-------
 .../broker/processor/QueryMessageProcessor.java    |  42 ++++++---
 .../broker/processor/SendMessageProcessorTest.java |  47 ++++------
 pom.xml                                            |   5 +
 remoting/BUILD.bazel                               |  26 ++++--
 remoting/pom.xml                                   |   4 +
 .../main/java/org/apache/rocketmq/common/Pair.java |   0
 .../rocketmq/common}/metrics/NopLongCounter.java   |   2 +-
 .../rocketmq/common}/metrics/NopLongHistogram.java |   2 +-
 .../common}/metrics/NopLongUpDownCounter.java      |   2 +-
 .../common}/metrics/NopObservableLongGauge.java    |   2 +-
 .../rocketmq/common/protocol/RequestCode.java      |   0
 .../rocketmq/common/protocol/ResponseCode.java     |   0
 .../apache/rocketmq/remoting/RemotingServer.java   |   2 +-
 .../org/apache/rocketmq/remoting/common/Pair.java  |  43 ---------
 .../remoting/metrics/RemotingMetricsConstant.java  |  69 ++++++++++++++
 .../remoting/metrics/RemotingMetricsManager.java   | 102 +++++++++++++++++++++
 .../rocketmq/remoting/netty/NettyDecoder.java      |  10 +-
 .../remoting/netty/NettyRemotingAbstract.java      |  94 ++++++++++++++-----
 .../remoting/netty/NettyRemotingClient.java        |   2 +-
 .../remoting/netty/NettyRemotingServer.java        |   2 +-
 .../remoting/protocol/RemotingCommand.java         |  41 ++++++---
 30 files changed, 513 insertions(+), 292 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index f3cfc11e3..59b884324 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -21,7 +21,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
@@ -53,6 +52,7 @@ public class PullRequestHoldService extends ServiceThread {
             }
         }
 
+        pullRequest.getRequestCommand().setSuspended(true);
         mpr.addPullRequest(pullRequest);
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index d18a1eb25..8840b45e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -45,13 +45,18 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.store.MessageStore;
 
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
@@ -59,15 +64,15 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_M
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
-import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
 
 public class BrokerMetricsManager {
@@ -241,6 +246,7 @@ public class BrokerMetricsManager {
 
         initStatsMetrics();
         initRequestMetrics();
+        initOtherMetrics();
     }
 
     private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
@@ -261,6 +267,10 @@ public class BrokerMetricsManager {
             .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
             .build();
         providerBuilder.registerView(messageSizeSelector, messageSizeView);
+
+        for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
+            providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+        }
     }
 
     private void initStatsMetrics() {
@@ -311,6 +321,10 @@ public class BrokerMetricsManager {
             .build();
     }
 
+    private void initOtherMetrics() {
+        RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
+    }
+
     public void shutdown() {
         if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
             periodicMetricReader.forceFlush();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 01968bac0..1a87da322 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
@@ -58,6 +57,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -524,16 +524,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
 
     protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
         final RemotingCommand response) {
-        if (!request.isOnewayRPC()) {
-            try {
-                ctx.writeAndFlush(response);
-            } catch (Throwable e) {
-                LOGGER.error(
-                    "SendMessageProcessor finished processing the request, but failed to send response, client "
-                        + "address={}, request={}, response={}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                    request.toString(), response.toString(), e);
-            }
-        }
+        NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
     }
 
     public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e0f876e17..db5cec6f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -42,15 +42,11 @@ import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
-import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
-import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
-import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -92,11 +88,13 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.common.protocol.body.QuerySubscriptionResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
@@ -109,6 +107,7 @@ import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequest
 import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ExchangeHAInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ExchangeHAInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
@@ -161,12 +160,14 @@ import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsSnapshot;
 import org.apache.rocketmq.common.subscription.GroupForbidden;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.filter.util.BitsArray;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -178,10 +179,10 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
-import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 
@@ -535,7 +536,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 response.setOpaque(request.getOpaque());
                 response.markResponseType();
                 response.setRemark(null);
-                ctx.writeAndFlush(response);
+                NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
             } else {
                 String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed.";
                 LOGGER.warn(errorMsg);
@@ -569,7 +570,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 response.setOpaque(request.getOpaque());
                 response.markResponseType();
                 response.setRemark(null);
-                ctx.writeAndFlush(response);
+                NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
             } else {
                 String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed.";
                 LOGGER.warn(errorMsg);
@@ -604,7 +605,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 response.setOpaque(request.getOpaque());
                 response.markResponseType();
                 response.setRemark(null);
-                ctx.writeAndFlush(response);
+                NettyRemotingAbstract.writeResponse(ctx.channel(), request, response);
             } else {
                 String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed.";
                 LOGGER.warn(errorMsg);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 76e1d8b63..fe8e7aa0e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -18,12 +18,12 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.FileRegion;
 import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageFilter;
@@ -56,6 +57,9 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
 
 public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
 
@@ -68,14 +72,14 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
 
     @Override
     public RemotingCommand handle(final GetMessageResult getMessageResult,
-                                  final RemotingCommand request,
-                                  final PullMessageRequestHeader requestHeader,
-                                  final Channel channel,
-                                  final SubscriptionData subscriptionData,
-                                  final SubscriptionGroupConfig subscriptionGroupConfig,
-                                  final boolean brokerAllowSuspend,
-                                  final MessageFilter messageFilter,
-                                  RemotingCommand response) {
+        final RemotingCommand request,
+        final PullMessageRequestHeader requestHeader,
+        final Channel channel,
+        final SubscriptionData subscriptionData,
+        final SubscriptionGroupConfig subscriptionGroupConfig,
+        final boolean brokerAllowSuspend,
+        final MessageFilter messageFilter,
+        RemotingCommand response) {
 
         PullMessageProcessor processor = brokerController.getPullMessageProcessor();
         processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
@@ -86,10 +90,10 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
         switch (response.getCode()) {
             case ResponseCode.SUCCESS:
                 this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        getMessageResult.getMessageCount());
+                    getMessageResult.getMessageCount());
 
                 this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        getMessageResult.getBufferTotalSize());
+                    getMessageResult.getBufferTotalSize());
 
                 this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
 
@@ -114,23 +118,27 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
                     final long beginTimeMills = this.brokerController.getMessageStore().now();
                     final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                     this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
-                            requestHeader.getTopic(), requestHeader.getQueueId(),
-                            (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+                        requestHeader.getTopic(), requestHeader.getQueueId(),
+                        (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                     response.setBody(r);
                     return response;
                 } else {
                     try {
                         FileRegion fileRegion =
-                                new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
-                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) throws Exception {
+                            new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+                        channel.writeAndFlush(fileRegion)
+                            .addListener((ChannelFutureListener) future -> {
                                 getMessageResult.release();
+                                Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+                                    .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+                                    .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+                                    .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+                                    .build();
+                                RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                                 if (!future.isSuccess()) {
                                     log.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
                                 }
-                            }
-                        });
+                            });
                     } catch (Throwable e) {
                         log.error("Error occurred when transferring messages from page cache", e);
                         getMessageResult.release();
@@ -151,7 +159,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
                     long offset = requestHeader.getQueueOffset();
                     int queueId = requestHeader.getQueueId();
                     PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
-                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
+                        this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                     this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                     return null;
                 }
@@ -159,7 +167,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
                 break;
             case ResponseCode.PULL_OFFSET_MOVED:
                 if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
-                        || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+                    || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                     MessageQueue mq = new MessageQueue();
                     mq.setTopic(requestHeader.getTopic());
                     mq.setQueueId(requestHeader.getQueueId());
@@ -171,15 +179,15 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
                     event.setOffsetRequest(requestHeader.getQueueOffset());
                     event.setOffsetNew(getMessageResult.getNextBeginOffset());
                     log.warn(
-                            "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
-                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
-                            responseHeader.getSuggestWhichBrokerId());
+                        "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+                        responseHeader.getSuggestWhichBrokerId());
                 } else {
                     responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                     response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                     log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
-                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
-                            responseHeader.getSuggestWhichBrokerId());
+                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+                        responseHeader.getSuggestWhichBrokerId());
                 }
 
                 break;
@@ -202,7 +210,8 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
         return true;
     }
 
-    protected byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
+    protected byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group,
+        final String topic,
         final int queueId) {
         final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 2b39cba5c..559fd61a0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -18,16 +18,12 @@ package org.apache.rocketmq.broker.processor;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.longpolling.NotificationRequest;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -44,6 +40,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -148,31 +145,18 @@ public class NotificationProcessor implements NettyRequestProcessor {
         if (!request.getChannel().isActive()) {
             return;
         }
-        Runnable run = new Runnable() {
-            @Override
-            public void run() {
-                final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
-
-                if (response != null) {
-                    response.setOpaque(request.getRemotingCommand().getOpaque());
-                    response.markResponseType();
-                    try {
-                        request.getChannel().writeAndFlush(response).addListener(new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) throws Exception {
-                                if (!future.isSuccess()) {
-                                    POP_LOGGER.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
-                                    POP_LOGGER.error(request.toString());
-                                    POP_LOGGER.error(response.toString());
-                                }
-                            }
-                        });
-                    } catch (Throwable e) {
-                        POP_LOGGER.error("ProcessRequestWrapper process request over, but response failed", e);
+        Runnable run = () -> {
+            final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
+            if (response != null) {
+                response.setOpaque(request.getRemotingCommand().getOpaque());
+                response.markResponseType();
+                NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future ->  {
+                    if (!future.isSuccess()) {
+                        POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
                         POP_LOGGER.error(request.toString());
                         POP_LOGGER.error(response.toString());
                     }
-                }
+                });
             }
         };
         this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
index 2bce7a581..688d94c63 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PeekMessageProcessor.java
@@ -17,16 +17,14 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
-
 import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Random;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
@@ -46,6 +44,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -55,6 +54,9 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
 
 public class PeekMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -204,15 +206,20 @@ public class PeekMessageProcessor implements NettyRequestProcessor {
                     try {
                         FileRegion fileRegion =
                             new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
-                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) throws Exception {
+                        RemotingCommand finalResponse = response;
+                        channel.writeAndFlush(fileRegion)
+                            .addListener((ChannelFutureListener) future -> {
                                 tmpGetMessageResult.release();
+                                Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+                                    .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+                                    .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(finalResponse.getCode()))
+                                    .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+                                    .build();
+                                RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                                 if (!future.isSuccess()) {
                                     LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
                                 }
-                            }
-                        });
+                            });
                     } catch (Throwable e) {
                         LOG.error("Error occurred when transferring messages from page cache", e);
                         getMessageResult.release();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index ac1dd4615..354e44157 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.processor;
 import com.alibaba.fastjson.JSON;
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
@@ -31,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
@@ -53,6 +53,7 @@ import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
@@ -65,12 +66,13 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -78,6 +80,9 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
 
 public class PopMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger POP_LOGGER =
@@ -221,35 +226,22 @@ public class PopMessageProcessor implements NettyRequestProcessor {
         if (!request.getChannel().isActive()) {
             return false;
         }
-        Runnable run = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    final RemotingCommand response = PopMessageProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
-
-                    if (response != null) {
-                        response.setOpaque(request.getRemotingCommand().getOpaque());
-                        response.markResponseType();
-                        try {
-                            request.getChannel().writeAndFlush(response).addListener(new ChannelFutureListener() {
-                                @Override
-                                public void operationComplete(ChannelFuture future) throws Exception {
-                                    if (!future.isSuccess()) {
-                                        POP_LOGGER.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
-                                        POP_LOGGER.error(request.toString());
-                                        POP_LOGGER.error(response.toString());
-                                    }
-                                }
-                            });
-                        } catch (Throwable e) {
-                            POP_LOGGER.error("ProcessRequestWrapper process request over, but response failed", e);
+        Runnable run = () -> {
+            try {
+                final RemotingCommand response = processRequest(request.getChannel(), request.getRemotingCommand());
+                if (response != null) {
+                    response.setOpaque(request.getRemotingCommand().getOpaque());
+                    response.markResponseType();
+                    NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
+                        if (!future.isSuccess()) {
+                            POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
                             POP_LOGGER.error(request.toString());
                             POP_LOGGER.error(response.toString());
                         }
-                    }
-                } catch (RemotingCommandException e1) {
-                    POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
+                    });
                 }
+            } catch (RemotingCommandException e1) {
+                POP_LOGGER.error("ExecuteRequestWhenWakeup run", e1);
             }
         };
         this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
@@ -462,16 +454,21 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                         FileRegion fileRegion =
                             new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()),
                                 getMessageResult);
-                        channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                            @Override
-                            public void operationComplete(ChannelFuture future) throws Exception {
+                        RemotingCommand finalResponse = response;
+                        channel.writeAndFlush(fileRegion)
+                            .addListener((ChannelFutureListener) future -> {
                                 tmpGetMessageResult.release();
+                                Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+                                    .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+                                    .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(finalResponse.getCode()))
+                                    .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+                                    .build();
+                                RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                                 if (!future.isSuccess()) {
                                     POP_LOGGER.error("Fail to transfer messages from page cache to {}",
                                         channel.remoteAddress(), future.cause());
                                 }
-                            }
-                        });
+                            });
                     } catch (Throwable e) {
                         POP_LOGGER.error("Error occurred when transferring messages from page cache", e);
                         getMessageResult.release();
@@ -520,20 +517,20 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                 return restNum;
             }
             getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup()
-                    , topic, queueId, offset,
-                    requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
+                , topic, queueId, offset,
+                requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);
             if (getMessageTmpResult == null) {
                 return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
             }
             // maybe store offset is not correct.
             if (GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageTmpResult.getStatus())
-                    || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
-                    || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
+                || GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageTmpResult.getStatus())
+                || GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageTmpResult.getStatus())) {
                 // commit offset, because the offset is not correct
                 // If offset in store is greater than cq offset, it will cause duplicate messages,
                 // because offset in PopBuffer is not committed.
                 POP_LOGGER.warn("Pop initial offset, because store is no correct, {}, {}->{}",
-                        lockKey, offset, getMessageTmpResult.getNextBeginOffset());
+                    lockKey, offset, getMessageTmpResult.getNextBeginOffset());
                 offset = getMessageTmpResult.getNextBeginOffset();
                 this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
                     queueId, offset);
@@ -685,6 +682,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             }
         }
         if (queue.add(request)) {
+            remotingCommand.setSuspended(true);
             totalPollingNum.incrementAndGet();
             if (brokerController.getBrokerConfig().isEnablePopLog()) {
                 POP_LOGGER.info("polling {}, result POLLING_SUC", remotingCommand);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 14771535d..0454a2550 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -16,12 +16,9 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import java.util.List;
-
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
 import java.util.Objects;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -64,6 +61,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -309,7 +307,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(String.format("the broker[%s] pulling message is forbidden",
-                    this.brokerController.getBrokerConfig().getBrokerIP1()));
+                this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
 
@@ -708,38 +706,30 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         }
     }
 
-    public void executeRequestWhenWakeup(final Channel channel,
-        final RemotingCommand request) throws RemotingCommandException {
-        Runnable run = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
-
-                    if (response != null) {
-                        response.setOpaque(request.getOpaque());
-                        response.markResponseType();
-                        try {
-                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
-                                @Override
-                                public void operationComplete(ChannelFuture future) throws Exception {
-                                    if (!future.isSuccess()) {
-                                        LOGGER.error("processRequestWrapper response to {} failed",
-                                            future.channel().remoteAddress(), future.cause());
-                                        LOGGER.error(request.toString());
-                                        LOGGER.error(response.toString());
-                                    }
-                                }
-                            });
-                        } catch (Throwable e) {
-                            LOGGER.error("processRequestWrapper process request over, but response failed", e);
-                            LOGGER.error(request.toString());
-                            LOGGER.error(response.toString());
-                        }
+    public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) {
+        Runnable run = () -> {
+            try {
+                final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
+
+                if (response != null) {
+                    response.setOpaque(request.getOpaque());
+                    response.markResponseType();
+                    try {
+                        NettyRemotingAbstract.writeResponse(channel, request, response, future -> {
+                            if (!future.isSuccess()) {
+                                LOGGER.error("processRequestWrapper response to {} failed", channel.remoteAddress(), future.cause());
+                                LOGGER.error(request.toString());
+                                LOGGER.error(response.toString());
+                            }
+                        });
+                    } catch (Throwable e) {
+                        LOGGER.error("processRequestWrapper process request over, but response failed", e);
+                        LOGGER.error(request.toString());
+                        LOGGER.error(response.toString());
                     }
-                } catch (RemotingCommandException e1) {
-                    LOGGER.error("excuteRequestWhenWakeup run", e1);
                 }
+            } catch (RemotingCommandException e1) {
+                LOGGER.error("excuteRequestWhenWakeup run", e1);
             }
         };
         this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index 5897a35f8..9eebf1e21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -16,28 +16,34 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
+import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
 import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+
 public class QueryMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -100,15 +106,20 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
                 FileRegion fileRegion =
                     new QueryMessageTransfer(response.encodeHeader(queryMessageResult
                         .getBufferTotalSize()), queryMessageResult);
-                ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
+                ctx.channel()
+                    .writeAndFlush(fileRegion)
+                    .addListener((ChannelFutureListener) future -> {
                         queryMessageResult.release();
+                        Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+                            .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+                            .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+                            .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+                            .build();
+                        RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                         if (!future.isSuccess()) {
                             LOGGER.error("transfer query message by page cache failed, ", future.cause());
                         }
-                    }
-                });
+                    });
             } catch (Throwable e) {
                 LOGGER.error("", e);
                 queryMessageResult.release();
@@ -140,15 +151,20 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
                 FileRegion fileRegion =
                     new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
                         selectMappedBufferResult);
-                ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
+                ctx.channel()
+                    .writeAndFlush(fileRegion)
+                    .addListener((ChannelFutureListener) future -> {
                         selectMappedBufferResult.release();
+                        Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
+                            .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+                            .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()))
+                            .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
+                            .build();
+                        RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                         if (!future.isSuccess()) {
                             LOGGER.error("Transfer one message from page cache failed, ", future.cause());
                         }
-                    }
-                });
+                    });
             } catch (Throwable e) {
                 LOGGER.error("", e);
                 selectMappedBufferResult.release();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 082267089..d86efa47c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -18,7 +18,12 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
@@ -34,6 +39,7 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -46,7 +52,6 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.AppendMessageStatus;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
@@ -56,22 +61,13 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -79,6 +75,8 @@ public class SendMessageProcessorTest {
     private SendMessageProcessor sendMessageProcessor;
     @Mock
     private ChannelHandlerContext handlerContext;
+    @Mock
+    private Channel channel;
     @Spy
     private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
         new NettyClientConfig(), new MessageStoreConfig());
@@ -101,9 +99,8 @@ public class SendMessageProcessorTest {
         when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         when(brokerController.getPutMessageFutureExecutor()).thenReturn(Executors.newSingleThreadExecutor());
         when(messageStore.now()).thenReturn(System.currentTimeMillis());
-        Channel mockChannel = mock(Channel.class);
-        when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
-        when(handlerContext.channel()).thenReturn(mockChannel);
+        when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
+        when(handlerContext.channel()).thenReturn(channel);
         when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
         sendMessageProcessor = new SendMessageProcessor(brokerController);
     }
@@ -220,13 +217,10 @@ public class SendMessageProcessorTest {
             .thenReturn(CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))));
         RemotingCommand request = createSendTransactionMsgCommand(RequestCode.SEND_MESSAGE);
         final RemotingCommand[] response = new RemotingCommand[1];
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                response[0] = invocation.getArgument(0);
-                return null;
-            }
-        }).when(handlerContext).writeAndFlush(any(Object.class));
+        doAnswer(invocation -> {
+            response[0] = invocation.getArgument(0);
+            return null;
+        }).when(channel).writeAndFlush(any(Object.class));
         await().atMost(Duration.ofSeconds(10)).until(() -> {
             RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
             if (responseToReturn != null) {
@@ -366,13 +360,10 @@ public class SendMessageProcessorTest {
     private void assertPutResult(int responseCode) throws RemotingCommandException {
         final RemotingCommand request = createSendMsgCommand(RequestCode.SEND_MESSAGE);
         final RemotingCommand[] response = new RemotingCommand[1];
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                response[0] = invocation.getArgument(0);
-                return null;
-            }
-        }).when(handlerContext).writeAndFlush(any(Object.class));
+        doAnswer(invocation -> {
+            response[0] = invocation.getArgument(0);
+            return null;
+        }).when(channel).writeAndFlush(any(Object.class));
         await().atMost(Duration.ofSeconds(10)).until(() -> {
             RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
             if (responseToReturn != null) {
@@ -388,4 +379,4 @@ public class SendMessageProcessorTest {
             return true;
         });
     }
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
index 879821c5a..cc6baa443 100644
--- a/pom.xml
+++ b/pom.xml
@@ -937,6 +937,11 @@
                 <artifactId>opentelemetry-sdk</artifactId>
                 <version>${opentelemetry.version}</version>
             </dependency>
+            <dependency>
+                <groupId>javax.annotation</groupId>
+                <artifactId>javax.annotation-api</artifactId>
+                <version>1.3.2</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/remoting/BUILD.bazel b/remoting/BUILD.bazel
index 580b95080..327c51f20 100644
--- a/remoting/BUILD.bazel
+++ b/remoting/BUILD.bazel
@@ -23,12 +23,19 @@ java_library(
     deps = [
         "//logging",
         "@maven//:com_alibaba_fastjson",
+        "@maven//:com_google_guava_guava",
+        "@maven//:com_google_code_findbugs_jsr305",
+        "@maven//:com_squareup_okio_okio_jvm",
         "@maven//:io_netty_netty_all",
-        "@maven//:org_apache_commons_commons_lang3",
+        "@maven//:io_opentelemetry_opentelemetry_api",
+        "@maven//:io_opentelemetry_opentelemetry_context",
         "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
         "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
         "@maven//:io_opentelemetry_opentelemetry_sdk",
-        "@maven//:com_squareup_okio_okio_jvm",
+        "@maven//:io_opentelemetry_opentelemetry_sdk_common",
+        "@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+        "@maven//:javax_annotation_javax_annotation_api",
+        "@maven//:org_apache_commons_commons_lang3",
     ],
 )
 
@@ -39,14 +46,21 @@ java_library(
     deps = [
         ":remoting",
         "//:test_deps",
-        "@maven//:io_netty_netty_all",
-        "@maven//:com_google_code_gson_gson",
         "@maven//:com_alibaba_fastjson",
-        "@maven//:org_apache_commons_commons_lang3",
+        "@maven//:com_google_code_gson_gson",
+        "@maven//:com_google_guava_guava",
+        "@maven//:com_google_code_findbugs_jsr305",
+        "@maven//:com_squareup_okio_okio_jvm",
+        "@maven//:io_netty_netty_all",
+        "@maven//:io_opentelemetry_opentelemetry_api",
+        "@maven//:io_opentelemetry_opentelemetry_context",
         "@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
         "@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
         "@maven//:io_opentelemetry_opentelemetry_sdk",
-        "@maven//:com_squareup_okio_okio_jvm",
+        "@maven//:io_opentelemetry_opentelemetry_sdk_common",
+        "@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+        "@maven//:javax_annotation_javax_annotation_api",
+        "@maven//:org_apache_commons_commons_lang3",
     ],
     resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
 )
diff --git a/remoting/pom.xml b/remoting/pom.xml
index b471f67c2..397a72748 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -78,5 +78,9 @@
             <groupId>com.squareup.okio</groupId>
             <artifactId>okio-jvm</artifactId>
         </dependency>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/common/src/main/java/org/apache/rocketmq/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/common/Pair.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/Pair.java
rename to remoting/src/main/java/org/apache/rocketmq/common/Pair.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
index 5f9c558e0..a281216ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongCounter.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongCounter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
 
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.LongCounter;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
index 582b20daa..e967c63f2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongHistogram.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongHistogram.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
 
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.LongHistogram;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
similarity index 96%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
index 0752d57a1..3e8be1976 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopLongUpDownCounter.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopLongUpDownCounter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
 
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.LongUpDownCounter;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
similarity index 95%
rename from broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
rename to remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
index 442f3697e..091fa72de 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/NopObservableLongGauge.java
+++ b/remoting/src/main/java/org/apache/rocketmq/common/metrics/NopObservableLongGauge.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.metrics;
+package org.apache.rocketmq.common.metrics;
 
 import io.opentelemetry.api.metrics.ObservableLongGauge;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
rename to remoting/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
similarity index 100%
rename from common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
rename to remoting/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 36e2035dc..8cfa1e1a0 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.remoting;
 
 import io.netty.channel.Channel;
 import java.util.concurrent.ExecutorService;
-import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
deleted file mode 100644
index 01e761fcf..000000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/Pair.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting.common;
-
-public class Pair<T1, T2> {
-    private T1 object1;
-    private T2 object2;
-
-    public Pair(T1 object1, T2 object2) {
-        this.object1 = object1;
-        this.object2 = object2;
-    }
-
-    public T1 getObject1() {
-        return object1;
-    }
-
-    public void setObject1(T1 object1) {
-        this.object1 = object1;
-    }
-
-    public T2 getObject2() {
-        return object2;
-    }
-
-    public void setObject2(T2 object2) {
-        this.object2 = object2;
-    }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
new file mode 100644
index 000000000..242172ccf
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.metrics;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+public class RemotingMetricsConstant {
+    public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
+
+    public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
+    public static final String LABEL_REQUEST_CODE = "request_code";
+    public static final String LABEL_RESPONSE_CODE = "response_code";
+    public static final String LABEL_IS_LONG_POLLING = "is_long_polling";
+    public static final String LABEL_RESULT = "result";
+
+    public static final String PROTOCOL_TYPE_REMOTING = "remoting";
+
+    public static final String RESULT_ONEWAY = "oneway";
+    public static final String RESULT_SUCCESS = "success";
+    public static final String RESULT_CANCELED = "cancelled";
+    public static final String RESULT_PROCESS_REQUEST_FAILED = "process_request_failed";
+    public static final String RESULT_WRITE_CHANNEL_FAILED = "write_channel_failed";
+
+    public static final Map<Integer, String> REQUEST_CODE_MAP = new HashMap<Integer, String>() {
+        {
+            try {
+                Field[] f = RequestCode.class.getFields();
+                for (Field field : f) {
+                    if (field.getType() == int.class) {
+                        put((int) field.get(null), field.getName().toLowerCase());
+                    }
+                }
+            } catch (IllegalAccessException ignore) {
+            }
+        }
+    };
+
+    public static final Map<Integer, String> RESPONSE_CODE_MAP = new HashMap<Integer, String>() {
+        {
+            try {
+                Field[] f = ResponseCode.class.getFields();
+                for (Field field : f) {
+                    if (field.getType() == int.class) {
+                        put((int) field.get(null), field.getName().toLowerCase());
+                    }
+                }
+            } catch (IllegalAccessException ignore) {
+            }
+        }
+    };
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
new file mode 100644
index 000000000..4ca1f033f
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.metrics;
+
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Future;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.REQUEST_CODE_MAP;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESPONSE_CODE_MAP;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
+
+public class RemotingMetricsManager {
+    public static LongHistogram rpcLatency = new NopLongHistogram();
+    public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+
+    public static AttributesBuilder newAttributesBuilder() {
+        if (attributesBuilderSupplier == null) {
+            return Attributes.builder();
+        }
+        return attributesBuilderSupplier.get()
+            .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING);
+    }
+
+    public static void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
+        RemotingMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+        rpcLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY)
+            .setDescription("Rpc latency")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .build();
+    }
+
+    public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+        List<Double> rpcCostTimeBuckets = Arrays.asList(
+            (double) Duration.ofMillis(1).toMillis(),
+            (double) Duration.ofMillis(10).toMillis(),
+            (double) Duration.ofMillis(100).toMillis(),
+            (double) Duration.ofSeconds(1).toMillis(),
+            (double) Duration.ofSeconds(2).toMillis(),
+            (double) Duration.ofSeconds(3).toMillis()
+        );
+        InstrumentSelector selector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_RPC_LATENCY)
+            .build();
+        View view = View.builder()
+            .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+            .build();
+        return Lists.newArrayList(new Pair<>(selector, view));
+    }
+
+    public static String getWriteAndFlushResult(Future<?> future) {
+        String result = RESULT_SUCCESS;
+        if (future.isCancelled()) {
+            result = RESULT_CANCELED;
+        } else if (!future.isSuccess()) {
+            result = RESULT_WRITE_CHANNEL_FAILED;
+        }
+        return result;
+    }
+
+    public static String getRequestCodeDesc(int code) {
+        return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
+    }
+
+    public static String getResponseCodeDesc(int code) {
+        return RESPONSE_CODE_MAP.getOrDefault(code, String.valueOf(code));
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index 0fcb00e04..d4834f4b2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -16,13 +16,14 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
+import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyDecoder extends LengthFieldBasedFrameDecoder {
@@ -38,12 +39,15 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
     @Override
     public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
         ByteBuf frame = null;
+        Stopwatch timer = Stopwatch.createStarted();
         try {
             frame = (ByteBuf) super.decode(ctx, in);
             if (null == frame) {
                 return null;
             }
-            return RemotingCommand.decode(frame);
+            RemotingCommand cmd = RemotingCommand.decode(frame);
+            cmd.setProcessTimer(timer);
+            return cmd;
         } catch (Exception e) {
             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
             RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b287387a7..ebc8956d9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -21,6 +21,8 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.opentelemetry.api.common.AttributesBuilder;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,22 +37,32 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.common.ServiceThread;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_PROCESS_REQUEST_FAILED;
+import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
+
 public abstract class NettyRemotingAbstract {
 
     /**
@@ -178,6 +190,49 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response) {
+        writeResponse(channel, request, response, null);
+    }
+
+    public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response, Consumer<Future<?>> callback) {
+        if (response == null) {
+            return;
+        }
+        AttributesBuilder attributesBuilder = RemotingMetricsManager.newAttributesBuilder()
+            .put(LABEL_IS_LONG_POLLING, request.isSuspended())
+            .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(request.getCode()))
+            .put(LABEL_RESPONSE_CODE, RemotingMetricsManager.getResponseCodeDesc(response.getCode()));
+        if (request.isOnewayRPC()) {
+            attributesBuilder.put(LABEL_RESULT, RESULT_ONEWAY);
+            RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+            return;
+        }
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        try {
+            channel.writeAndFlush(response).addListener((ChannelFutureListener) future -> {
+                if (future.isSuccess()) {
+                    log.debug("Response[request code: {}, response code: {}, opaque: {}] is written to channel{}",
+                        request.getCode(), response.getCode(), response.getOpaque(), channel);
+                } else {
+                    log.error("Failed to write response[request code: {}, response code: {}, opaque: {}] to channel{}",
+                        request.getCode(), response.getCode(), response.getOpaque(), channel, future.cause());
+                }
+                attributesBuilder.put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future));
+                RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+                if (callback != null) {
+                    callback.accept(future);
+                }
+            });
+        } catch (Throwable e) {
+            log.error("process request over, but response failed", e);
+            log.error(request.toString());
+            log.error(response.toString());
+            attributesBuilder.put(LABEL_RESULT, RESULT_WRITE_CHANNEL_FAILED);
+            RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+        }
+    }
+
     /**
      * Process incoming request command issued by remote peer.
      *
@@ -194,7 +249,7 @@ public abstract class NettyRemotingAbstract {
             final RemotingCommand response =
                 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             response.setOpaque(opaque);
-            ctx.writeAndFlush(response);
+            writeResponse(ctx.channel(), cmd, response);
             log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
             return;
         }
@@ -205,7 +260,7 @@ public abstract class NettyRemotingAbstract {
             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                     "[REJECTREQUEST]system busy, start flow control for a while");
             response.setOpaque(opaque);
-            ctx.writeAndFlush(response);
+            writeResponse(ctx.channel(), cmd, response);
             return;
         }
 
@@ -221,12 +276,15 @@ public abstract class NettyRemotingAbstract {
                         + " request code: " + cmd.getCode());
             }
 
-            if (!cmd.isOnewayRPC()) {
-                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
-                        "[OVERLOAD]system busy, start flow control for a while");
-                response.setOpaque(opaque);
-                ctx.writeAndFlush(response);
-            }
+            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                "[OVERLOAD]system busy, start flow control for a while");
+            response.setOpaque(opaque);
+            writeResponse(ctx.channel(), cmd, response);
+        } catch (Throwable e) {
+            AttributesBuilder attributesBuilder = RemotingMetricsManager.newAttributesBuilder()
+                .put(LABEL_REQUEST_CODE, RemotingMetricsManager.getRequestCodeDesc(cmd.getCode()))
+                .put(LABEL_RESULT, RESULT_PROCESS_REQUEST_FAILED);
+            RemotingMetricsManager.rpcLatency.record(cmd.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
         }
     }
 
@@ -259,19 +317,7 @@ public abstract class NettyRemotingAbstract {
                     throw exception;
                 }
 
-                if (!cmd.isOnewayRPC()) {
-                    if (response != null) {
-                        response.setOpaque(opaque);
-                        response.markResponseType();
-                        try {
-                            ctx.writeAndFlush(response);
-                        } catch (Throwable e) {
-                            log.error("process request over, but response failed", e);
-                            log.error(cmd.toString());
-                            log.error(response.toString());
-                        }
-                    }
-                }
+                writeResponse(ctx.channel(), cmd, response);
             } catch (Throwable e) {
                 log.error("process request exception", e);
                 log.error(cmd.toString());
@@ -280,7 +326,7 @@ public abstract class NettyRemotingAbstract {
                     response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                             RemotingHelper.exceptionSimpleDesc(e));
                     response.setOpaque(opaque);
-                    ctx.writeAndFlush(response);
+                    writeResponse(ctx.channel(), cmd, response);
                 }
             }
         };
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 2d82fbe6c..def94ad8e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -68,12 +68,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 39be96c52..0fd8943b5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -55,12 +55,12 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingServer;
-import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.common.TlsMode;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 68ccdf69d..a3815aeca 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,13 +17,9 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import com.alibaba.fastjson.annotation.JSONField;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
@@ -36,9 +32,12 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class RemotingCommand {
     public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
@@ -90,6 +89,8 @@ public class RemotingCommand {
     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
 
     private transient byte[] body;
+    private boolean suspended;
+    private Stopwatch processTimer;
 
     protected RemotingCommand() {
     }
@@ -587,6 +588,16 @@ public class RemotingCommand {
         this.body = body;
     }
 
+    @JSONField(serialize = false)
+    public boolean isSuspended() {
+        return suspended;
+    }
+
+    @JSONField(serialize = false)
+    public void setSuspended(boolean suspended) {
+        this.suspended = suspended;
+    }
+
     public HashMap<String, String> getExtFields() {
         return extFields;
     }
@@ -616,4 +627,12 @@ public class RemotingCommand {
     public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
         this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
     }
-}
\ No newline at end of file
+
+    public Stopwatch getProcessTimer() {
+        return processTimer;
+    }
+
+    public void setProcessTimer(Stopwatch processTimer) {
+        this.processTimer = processTimer;
+    }
+}