You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/26 11:20:31 UTC
[rocketmq] branch 4.9.x updated: [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 2371a2923 [ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)
2371a2923 is described below
commit 2371a2923ffffe4059f0b3e570e4712e75396dd0
Author: Zhouxiang Zhan <zh...@alibaba-inc.com>
AuthorDate: Mon Sep 26 19:20:22 2022 +0800
[ISSUE #3905] Support bname in protocol for 4.9.x client (#5161)
As this is a backported patch. For the purpose of compatibility, let's merge it as it is.
---
.../broker/processor/ReplyMessageProcessor.java | 1 +
.../consumer/store/RemoteBrokerOffsetStore.java | 2 ++
.../apache/rocketmq/client/impl/MQAdminImpl.java | 9 +++---
.../rocketmq/client/impl/MQClientAPIImpl.java | 31 +++++++++++--------
.../impl/consumer/DefaultMQPullConsumerImpl.java | 4 +--
.../impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../client/impl/consumer/PullAPIWrapper.java | 1 +
.../impl/producer/DefaultMQProducerImpl.java | 5 +++
.../header/CheckTransactionStateRequestHeader.java | 13 ++++++--
.../header/ConsumerSendMsgBackRequestHeader.java | 4 +--
.../header/EndTransactionRequestHeader.java | 16 ++++++++--
.../GetEarliestMsgStoretimeRequestHeader.java | 4 +--
.../protocol/header/GetMaxOffsetRequestHeader.java | 4 +--
.../protocol/header/GetMinOffsetRequestHeader.java | 4 +--
.../protocol/header/PullMessageRequestHeader.java | 15 ++++++---
.../header/QueryConsumerOffsetRequestHeader.java | 4 +--
.../protocol/header/ReplyMessageRequestHeader.java | 4 +--
.../protocol/header/SearchOffsetRequestHeader.java | 4 +--
.../protocol/header/SendMessageRequestHeader.java | 4 +--
.../header/SendMessageRequestHeaderV2.java | 20 ++++++++----
.../header/UpdateConsumerOffsetRequestHeader.java | 4 +--
.../RpcRequestHeader.java} | 36 +++++-----------------
.../tools/admin/DefaultMQAdminExtImpl.java | 8 ++---
23 files changed, 113 insertions(+), 86 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 133165b9f..42b2edb6c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -173,6 +173,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
replyMessageRequestHeader.setProperties(requestHeader.getProperties());
replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
+ replyMessageRequestHeader.setBname(requestHeader.getBname());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
request.setBody(msg.getBody());
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 409ceab95..f2cb13a8c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -211,6 +211,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
+ requestHeader.setBname(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
@@ -238,6 +239,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setBname(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index ba4eafae9..49ffef9c0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -191,7 +191,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
+ return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
@@ -210,7 +210,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+ return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
@@ -228,7 +228,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+ return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
@@ -246,8 +246,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
- timeoutMillis);
+ return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bec1a5684..b2b8b9bd5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -836,13 +836,14 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
- public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
+ public long searchOffset(final String addr, final MessageQueue mq, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
requestHeader.setTimestamp(timestamp);
+ requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -861,11 +862,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
- public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ public long getMaxOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -912,11 +914,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
- public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ public long getMinOffset(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -936,12 +939,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
- public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
- final long timeoutMillis)
+ public long getEarliestMsgStoretime(final String addr, final MessageQueue mq, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setBname(mq.getBrokerName());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1100,6 +1103,7 @@ public class MQClientAPIImpl {
public void consumerSendMessageBack(
final String addr,
+ final String brokerName,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
@@ -1115,6 +1119,7 @@ public class MQClientAPIImpl {
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+ requestHeader.setBname(brokerName);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 7319fdad7..dfcc23e3a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -583,8 +583,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
- this.defaultMQPullConsumer.getMaxReconsumeTimes());
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
+ consumerGroup, delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 05ced26c4..df68284d4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index cc42a9e83..33021bf6b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -191,6 +191,7 @@ public class PullAPIWrapper {
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
+ requestHeader.setBname(mq.getBrokerName());
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 668f9b6b6..deb49e755 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -350,6 +350,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
+ thisHeader.setBname(checkRequestHeader.getBname());
+ thisHeader.setQueueId(checkRequestHeader.getQueueId());
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
@@ -774,6 +776,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
+ requestHeader.setBname(mq.getBrokerName());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
@@ -1323,6 +1326,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
+ requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
+ requestHeader.setQueueId(sendResult.getMessageQueue().getQueueId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
index 6cba71c7e..8c4b87c4a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
+public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long tranStateTableOffset;
@CFNotNull
@@ -32,6 +32,7 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
private String msgId;
private String transactionId;
private String offsetMsgId;
+ private int queueId;
@Override
public void checkFields() throws RemotingCommandException {
@@ -76,4 +77,12 @@ public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
public void setOffsetMsgId(String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index bd8fbb44c..f5dda1ba4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -17,12 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
@CFNotNull
private Long offset;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
index 87661c320..42a09e8e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
@@ -17,13 +17,13 @@
package org.apache.rocketmq.common.protocol.header;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class EndTransactionRequestHeader implements CommandCustomHeader {
+public class EndTransactionRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
@@ -43,6 +43,8 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {
private String transactionId;
+ private int queueId;
+
@Override
public void checkFields() throws RemotingCommandException {
if (MessageSysFlag.TRANSACTION_NOT_TYPE == this.commitOrRollback) {
@@ -126,6 +128,16 @@ public class EndTransactionRequestHeader implements CommandCustomHeader {
", fromTransactionCheck=" + fromTransactionCheck +
", msgId='" + msgId + '\'' +
", transactionId='" + transactionId + '\'' +
+ ", queueId=" + queueId +
+ ", bname='" + bname + '\'' +
'}';
}
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index c64381fb7..f75494e51 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader {
+public class GetEarliestMsgStoretimeRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 871309de6..dfa05e864 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
+public class GetMaxOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed40c..3a02634c8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMinOffsetRequestHeader implements CommandCustomHeader {
+public class GetMinOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 02fac8e59..440e5c607 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,17 +20,15 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import io.netty.buffer.ByteBuf;
import java.util.HashMap;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-import io.netty.buffer.ByteBuf;
-
-public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
+public class PullMessageRequestHeader extends RpcRequestHeader implements FastCodesHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -51,6 +49,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
private String subscription;
@CFNotNull
private Long subVersion;
+ @CFNullable
private String expressionType;
@Override
@@ -70,6 +69,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
writeIfNotNull(out, "subscription", subscription);
writeIfNotNull(out, "subVersion", subVersion);
writeIfNotNull(out, "expressionType", expressionType);
+ writeIfNotNull(out, "bname", bname);
}
@Override
@@ -128,6 +128,11 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
if (str != null) {
this.expressionType = str;
}
+
+ str = fields.get("bname");
+ if (str != null) {
+ this.bname = str;
+ }
}
public String getConsumerGroup() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627c3..195f46400 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class QueryConsumerOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
index 3bb09073f..aa747e9f4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
@@ -17,12 +17,12 @@
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ReplyMessageRequestHeader implements CommandCustomHeader {
+public class ReplyMessageRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24bf..cd9f9e18b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SearchOffsetRequestHeader implements CommandCustomHeader {
+public class SearchOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6bb..8fa7737f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SendMessageRequestHeader implements CommandCustomHeader {
+public class SendMessageRequestHeader extends RpcRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index ff9457e28..45c49ef1a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,20 +17,18 @@
package org.apache.rocketmq.common.protocol.header;
+import io.netty.buffer.ByteBuf;
import java.util.HashMap;
-
-import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-import io.netty.buffer.ByteBuf;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
/**
* Use short variable name to speed up FastJson deserialization process.
*/
-public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader {
+public class SendMessageRequestHeaderV2 extends RpcRequestHeader implements FastCodesHeader {
@CFNotNull
private String a; // producerGroup;
@CFNotNull
@@ -59,6 +57,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
@CFNullable
private boolean m; //batch
+ private String n; // brokerName
+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
@@ -74,6 +74,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
+ v1.setBname(v2.n);
return v1;
}
@@ -92,6 +93,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
+ v2.n = v1.getBname();
return v2;
}
@@ -114,6 +116,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
writeIfNotNull(out, "k", k);
writeIfNotNull(out, "l", l);
writeIfNotNull(out, "m", m);
+ writeIfNotNull(out, "n", n);
}
@Override
@@ -183,6 +186,11 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
if (str != null) {
m = Boolean.parseBoolean(str);
}
+
+ str = fields.get("n");
+ if (str != null) {
+ n = str;
+ }
}
public String getA() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db645..e6b0e9508 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class UpdateConsumerOffsetRequestHeader extends RpcRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
similarity index 52%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index 871309de6..9d1903b1c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -15,38 +15,18 @@
* limitations under the License.
*/
-/**
- * $Id: GetMaxOffsetRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header;
+package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
- @CFNotNull
- private String topic;
- @CFNotNull
- private Integer queueId;
-
- @Override
- public void checkFields() throws RemotingCommandException {
- }
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
+public abstract class RpcRequestHeader implements CommandCustomHeader {
+ protected String bname;
- public Integer getQueueId() {
- return queueId;
+ public String getBname() {
+ return bname;
}
- public void setQueueId(Integer queueId) {
- this.queueId = queueId;
+ public void setBname(String bname) {
+ this.bname = bname;
}
-}
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9e99925a8..ddb23cc30 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -596,12 +596,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
long resetOffset;
if (timestamp == -1) {
-
- resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timeoutMillis);
+ resetOffset = this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(brokerAddr, queue, timeoutMillis);
} else {
resetOffset =
- this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue.getTopic(), queue.getQueueId(), timestamp,
- timeoutMillis);
+ this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, queue, timestamp, timeoutMillis);
}
RollbackStats rollbackStats = new RollbackStats();
@@ -619,6 +617,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
requestHeader.setTopic(queue.getTopic());
requestHeader.setQueueId(queue.getQueueId());
requestHeader.setCommitOffset(resetOffset);
+ requestHeader.setBname(queue.getBrokerName());
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}
return rollbackStats;
@@ -1145,6 +1144,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
+ requestHeader.setBname(mq.getBrokerName());
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr, requestHeader, timeoutMillis);
}