You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/26 11:20:31 UTC

[rocketmq] branch 4.9.x updated: [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new 2371a2923 [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)
2371a2923 is described below

commit 2371a2923ffffe4059f0b3e570e4712e75396dd0
Author: Zhouxiang Zhan <zh...@alibaba-inc.com>
AuthorDate: Mon Sep 26 19:20:22 2022 +0800

    [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)
    
    As this is a backported patch. For the purpose of compatibility, let's merge it as it is.
---
 .../broker/processor/ReplyMessageProcessor.java    |  1 +
 .../consumer/store/RemoteBrokerOffsetStore.java    |  2 ++
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  9 +++---
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 31 +++++++++++--------
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |  4 +--
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  2 +-
 .../client/impl/consumer/PullAPIWrapper.java       |  1 +
 .../impl/producer/DefaultMQProducerImpl.java       |  5 +++
 .../header/CheckTransactionStateRequestHeader.java | 13 ++++++--
 .../header/ConsumerSendMsgBackRequestHeader.java   |  4 +--
 .../header/EndTransactionRequestHeader.java        | 16 ++++++++--
 .../GetEarliestMsgStoretimeRequestHeader.java      |  4 +--
 .../protocol/header/GetMaxOffsetRequestHeader.java |  4 +--
 .../protocol/header/GetMinOffsetRequestHeader.java |  4 +--
 .../protocol/header/PullMessageRequestHeader.java  | 15 ++++++---
 .../header/QueryConsumerOffsetRequestHeader.java   |  4 +--
 .../protocol/header/ReplyMessageRequestHeader.java |  4 +--
 .../protocol/header/SearchOffsetRequestHeader.java |  4 +--
 .../protocol/header/SendMessageRequestHeader.java  |  4 +--
 .../header/SendMessageRequestHeaderV2.java         | 20 ++++++++----
 .../header/UpdateConsumerOffsetRequestHeader.java  |  4 +--
 .../RpcRequestHeader.java}                         | 36 +++++-----------------
 .../tools/admin/DefaultMQAdminExtImpl.java         |  8 ++---
 23 files changed, 113 insertions(+), 86 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 133165b9f..42b2edb6c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -173,6 +173,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
         replyMessageRequestHeader.setProperties(requestHeader.getProperties());
         replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
         replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
+        replyMessageRequestHeader.setBname(requestHeader.getBname());
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
         request.setBody(msg.getBody());
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 409ceab95..f2cb13a8c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -211,6 +211,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
             requestHeader.setCommitOffset(offset);
+            requestHeader.setBname(mq.getBrokerName());
 
             if (isOneway) {
                 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
@@ -238,6 +239,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             requestHeader.setTopic(mq.getTopic());
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
+            requestHeader.setBname(mq.getBrokerName());
 
             return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index ba4eafae9..49ffef9c0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -191,7 +191,7 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
+                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
                     timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
@@ -210,7 +210,7 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+                return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -228,7 +228,7 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+                return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -246,8 +246,7 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
-                    timeoutMillis);
+                return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bec1a5684..b2b8b9bd5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -836,13 +836,14 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
     }
 
-    public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
+    public long searchOffset(final String addr, final MessageQueue mq, final long timestamp,
         final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
         requestHeader.setTimestamp(timestamp);
+        requestHeader.setBname(mq.getBrokerName());
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -861,11 +862,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
     }
 
-    public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+    public long getMaxOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setBname(mq.getBrokerName());
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -912,11 +914,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
     }
 
-    public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+    public long getMinOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setBname(mq.getBrokerName());
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -936,12 +939,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
     }
 
-    public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
-        final long timeoutMillis)
+    public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setBname(mq.getBrokerName());
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1100,6 +1103,7 @@ public class MQClientAPIImpl {
 
     public void consumerSendMessageBack(
         final String addr,
+        final String brokerName,
         final MessageExt msg,
         final String consumerGroup,
         final int delayLevel,
@@ -1115,6 +1119,7 @@ public class MQClientAPIImpl {
         requestHeader.setDelayLevel(delayLevel);
         requestHeader.setOriginMsgId(msg.getMsgId());
         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+        requestHeader.setBname(brokerName);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
             request, timeoutMillis);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 7319fdad7..dfcc23e3a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -583,8 +583,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
                 consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
             }
 
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
-                this.defaultMQPullConsumer.getMaxReconsumeTimes());
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
+                consumerGroup, delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 05ced26c4..df68284d4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         try {
             String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                 : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
                 this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index cc42a9e83..33021bf6b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -191,6 +191,7 @@ public class PullAPIWrapper {
             requestHeader.setSubscription(subExpression);
             requestHeader.setSubVersion(subVersion);
             requestHeader.setExpressionType(expressionType);
+            requestHeader.setBname(mq.getBrokerName());
 
             String brokerAddr = findBrokerResult.getBrokerAddr();
             if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 668f9b6b6..deb49e755 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -350,6 +350,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 thisHeader.setProducerGroup(producerGroup);
                 thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                 thisHeader.setFromTransactionCheck(true);
+                thisHeader.setBname(checkRequestHeader.getBname());
+                thisHeader.setQueueId(checkRequestHeader.getQueueId());
 
                 String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                 if (uniqueKey == null) {
@@ -774,6 +776,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
+                requestHeader.setBname(mq.getBrokerName());
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
@@ -1323,6 +1326,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
         requestHeader.setTransactionId(transactionId);
         requestHeader.setCommitLogOffset(id.getOffset());
+        requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
+        requestHeader.setQueueId(sendResult.getMessageQueue().getQueueId());
         switch (localTransactionState) {
             case COMMIT_MESSAGE:
                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
index 6cba71c7e..8c4b87c4a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
+public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private Long tranStateTableOffset;
     @CFNotNull
@@ -32,6 +32,7 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
     private String msgId;
     private String transactionId;
     private String offsetMsgId;
+    private int queueId;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -76,4 +77,12 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
     public void setOffsetMsgId(String offsetMsgId) {
         this.offsetMsgId = offsetMsgId;
     }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index bd8fbb44c..f5dda1ba4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -17,12 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private Long offset;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
index 87661c320..42a09e8e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
@@ -17,13 +17,13 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class EndTransactionRequestHeader implements CommandCustomHeader {
+public class EndTransactionRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
@@ -43,6 +43,8 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {
 
     private String transactionId;
 
+    private int queueId;
+
     @Override
     public void checkFields() throws RemotingCommandException {
         if (MessageSysFlag.TRANSACTION_NOT_TYPE == this.commitOrRollback) {
@@ -126,6 +128,16 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {
             ", fromTransactionCheck=" + fromTransactionCheck +
             ", msgId='" + msgId + '\'' +
             ", transactionId='" + transactionId + '\'' +
+            ", queueId=" + queueId +
+            ", bname='" + bname + '\'' +
             '}';
     }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index c64381fb7..f75494e51 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader {
+public class GetEarliestMsgStoretimeRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 871309de6..dfa05e864 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
+public class GetMaxOffsetRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed40c..3a02634c8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMinOffsetRequestHeader implements CommandCustomHeader {
+public class GetMinOffsetRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 02fac8e59..440e5c607 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,17 +20,15 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import io.netty.buffer.ByteBuf;
 import java.util.HashMap;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-import io.netty.buffer.ByteBuf;
-
-public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
+public class PullMessageRequestHeader extends RpcRequestHeader implements FastCodesHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -51,6 +49,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
     private String subscription;
     @CFNotNull
     private Long subVersion;
+    @CFNullable
     private String expressionType;
 
     @Override
@@ -70,6 +69,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
         writeIfNotNull(out, "subscription", subscription);
         writeIfNotNull(out, "subVersion", subVersion);
         writeIfNotNull(out, "expressionType", expressionType);
+        writeIfNotNull(out, "bname", bname);
     }
 
     @Override
@@ -128,6 +128,11 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
         if (str != null) {
             this.expressionType = str;
         }
+
+        str = fields.get("bname");
+        if (str != null) {
+            this.bname = str;
+        }
     }
 
     public String getConsumerGroup() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627c3..195f46400 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class QueryConsumerOffsetRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
index 3bb09073f..aa747e9f4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
@@ -17,12 +17,12 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ReplyMessageRequestHeader implements CommandCustomHeader {
+public class ReplyMessageRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24bf..cd9f9e18b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SearchOffsetRequestHeader implements CommandCustomHeader {
+public class SearchOffsetRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6bb..8fa7737f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SendMessageRequestHeader implements CommandCustomHeader {
+public class SendMessageRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index ff9457e28..45c49ef1a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,20 +17,18 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import io.netty.buffer.ByteBuf;
 import java.util.HashMap;
-
-import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-import io.netty.buffer.ByteBuf;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
 /**
  * Use short variable name to speed up FastJson deserialization process.
  */
-public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader {
+public class SendMessageRequestHeaderV2 extends RpcRequestHeader implements FastCodesHeader {
     @CFNotNull
     private String a; // producerGroup;
     @CFNotNull
@@ -59,6 +57,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     @CFNullable
     private boolean m; //batch
 
+    private String n; // brokerName
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -74,6 +74,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
+        v1.setBname(v2.n);
         return v1;
     }
 
@@ -92,6 +93,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
+        v2.n = v1.getBname();
         return v2;
     }
 
@@ -114,6 +116,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         writeIfNotNull(out, "k", k);
         writeIfNotNull(out, "l", l);
         writeIfNotNull(out, "m", m);
+        writeIfNotNull(out, "n", n);
     }
 
     @Override
@@ -183,6 +186,11 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         if (str != null) {
             m = Boolean.parseBoolean(str);
         }
+
+        str = fields.get("n");
+        if (str != null) {
+            n = str;
+        }
     }
 
     public String getA() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db645..e6b0e9508 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class UpdateConsumerOffsetRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
similarity index 52%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index 871309de6..9d1903b1c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -15,38 +15,18 @@
  * limitations under the License.
  */
 
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
+package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
-    @CFNotNull
-    private String topic;
-    @CFNotNull
-    private Integer queueId;
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-    }
 
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
+public abstract class RpcRequestHeader implements CommandCustomHeader {
+    protected String bname;
 
-    public Integer getQueueId() {
-        return queueId;
+    public String getBname() {
+        return bname;
     }
 
-    public void setQueueId(Integer queueId) {
-        this.queueId = queueId;
+    public void setBname(String bname) {
+        this.bname = bname;
     }
-}
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9e99925a8..ddb23cc30 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -596,12 +596,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
         long resetOffset;
         if (timestamp == -1) {
-
-            resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
+            resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue, timeoutMillis);
         } else {
             resetOffset =
-                this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp,
-                    timeoutMillis);
+                this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue, timestamp, timeoutMillis);
         }
 
         RollbackStats rollbackStats = new RollbackStats();
@@ -619,6 +617,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
             requestHeader.setTopic(queue.getTopic());
             requestHeader.setQueueId(queue.getQueueId());
             requestHeader.setCommitOffset(resetOffset);
+            requestHeader.setBname(queue.getBrokerName());
             this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
         }
         return rollbackStats;
@@ -1145,6 +1144,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         requestHeader.setTopic(mq.getTopic());
         requestHeader.setQueueId(mq.getQueueId());
         requestHeader.setCommitOffset(offset);
+        requestHeader.setBname(mq.getBrokerName());
         this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
     }