You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/21 02:37:05 UTC
[rocketmq] branch develop updated: [ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 646d04f42 [ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)
646d04f42 is described below
commit 646d04f42524636ad09364af2a67fa8ab8b6b8fa
Author: rongtong <ji...@163.com>
AuthorDate: Wed Dec 21 10:36:57 2022 +0800
[ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)
* Fix the invalid of heartbeat detection after controller switch
* Pass the checkstyle
* Format ReplicasInfoManagerTest code style
---
.../apache/rocketmq/broker/BrokerController.java | 4 +-
.../broker/controller/ReplicasManager.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 318 +++++++++++----------
.../broker/controller/ReplicasManagerTest.java | 4 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +
.../controller/BrokerHeartbeatManager.java | 18 +-
.../apache/rocketmq/controller/BrokerLiveInfo.java | 37 +--
.../rocketmq/controller/ControllerManager.java | 67 +++--
.../impl/DefaultBrokerHeartbeatManager.java | 69 ++---
.../processor/ControllerRequestProcessor.java | 14 +-
.../impl/controller/ControllerManagerTest.java | 8 +-
.../impl/DefaultBrokerHeartbeatManagerTest.java | 3 +-
.../impl/manager/ReplicasInfoManagerTest.java | 48 ++--
.../RegisterBrokerToControllerRequestHeader.java | 36 ++-
.../namesrv/BrokerHeartbeatRequestHeader.java | 30 ++
15 files changed, 356 insertions(+), 312 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 0a581b0c6..7804dfc78 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1747,7 +1747,9 @@ public class BrokerController {
this.brokerConfig.getSendHeartbeatTimeoutMillis(),
this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(),
this.messageStore.getMaxPhyOffset(),
- this.replicasManager.getConfirmOffset()
+ this.replicasManager.getConfirmOffset(),
+ this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+ this.brokerConfig.getBrokerElectionPriority()
);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index a6589d2ea..a0218f8cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -291,7 +291,7 @@ public class ReplicasManager {
// Register this broker to controller, get brokerId and masterAddress.
try {
final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
- this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
+ this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
final String newMasterAddress = registerResponse.getMasterAddress();
if (StringUtils.isNoneEmpty(newMasterAddress)) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 8d1690874..200b5e779 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -137,7 +137,7 @@ public class BrokerOuterAPI {
private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
+ new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
private ClientMetadata clientMetadata;
private RpcClient rpcClient;
@@ -186,7 +186,7 @@ public class BrokerOuterAPI {
private List<String> lookupNameServerAddress(String domain) {
List<String> addressList = new ArrayList<>();
try {
- java.security.Security.setProperty("networkaddress.cache.ttl" , "10");
+ java.security.Security.setProperty("networkaddress.cache.ttl", "10");
int index = domain.indexOf(":");
String portStr = domain.substring(index);
String domainStr = domain.substring(0, index);
@@ -213,13 +213,13 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String brokerName)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return syncBrokerMemberGroup(clusterName, brokerName, false);
}
public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String brokerName,
- boolean isCompatibleWithOldNameSrv)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ boolean isCompatibleWithOldNameSrv)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
if (isCompatibleWithOldNameSrv) {
return getBrokerMemberGroupCompatible(clusterName, brokerName);
} else {
@@ -228,7 +228,7 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(clusterName, brokerName);
GetBrokerMemberGroupRequestHeader requestHeader = new GetBrokerMemberGroupRequestHeader();
@@ -246,7 +246,7 @@ public class BrokerOuterAPI {
byte[] body = response.getBody();
if (body != null) {
GetBrokerMemberGroupResponseBody brokerMemberGroupResponseBody =
- GetBrokerMemberGroupResponseBody.decode(body, GetBrokerMemberGroupResponseBody.class);
+ GetBrokerMemberGroupResponseBody.decode(body, GetBrokerMemberGroupResponseBody.class);
return brokerMemberGroupResponseBody.getBrokerMemberGroup();
}
@@ -259,7 +259,7 @@ public class BrokerOuterAPI {
}
public BrokerMemberGroup getBrokerMemberGroupCompatible(String clusterName, String brokerName)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(clusterName, brokerName);
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
@@ -278,8 +278,8 @@ public class BrokerOuterAPI {
TopicRouteData topicRouteData = TopicRouteData.decode(body, TopicRouteData.class);
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData != null
- && brokerData.getBrokerName().equals(brokerName)
- && brokerData.getCluster().equals(clusterName)) {
+ && brokerData.getBrokerName().equals(brokerName)
+ && brokerData.getCluster().equals(clusterName)) {
brokerMemberGroup.getBrokerAddrs().putAll(brokerData.getBrokerAddrs());
break;
}
@@ -295,13 +295,13 @@ public class BrokerOuterAPI {
}
public void sendHeartbeatViaDataVersion(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMillis,
- final DataVersion dataVersion,
- final boolean isInBrokerContainer) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMillis,
+ final DataVersion dataVersion,
+ final boolean isInBrokerContainer) {
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
@@ -330,11 +330,11 @@ public class BrokerOuterAPI {
}
public void sendHeartbeat(final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMills,
- final boolean isInBrokerContainer) {
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int timeoutMills,
+ final boolean isInBrokerContainer) {
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();
@@ -361,8 +361,8 @@ public class BrokerOuterAPI {
}
public BrokerSyncInfo retrieveBrokerHaInfo(String masterBrokerAddr)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQBrokerException, RemotingCommandException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException, RemotingCommandException {
ExchangeHAInfoRequestHeader requestHeader = new ExchangeHAInfoRequestHeader();
requestHeader.setMasterHaAddress(null);
@@ -383,7 +383,7 @@ public class BrokerOuterAPI {
}
public void sendBrokerHaInfo(String brokerAddr, String masterHaAddr, long brokerInitMaxOffset, String masterAddr)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
ExchangeHAInfoRequestHeader requestHeader = new ExchangeHAInfoRequestHeader();
requestHeader.setMasterHaAddress(masterHaAddr);
requestHeader.setMasterFlushOffset(brokerInitMaxOffset);
@@ -406,30 +406,30 @@ public class BrokerOuterAPI {
}
public List<RegisterBrokerResult> registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills,
- final boolean enableActingMaster,
- final boolean compressed,
- final BrokerIdentity brokerIdentity) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills,
+ final boolean enableActingMaster,
+ final boolean compressed,
+ final BrokerIdentity brokerIdentity) {
return registerBrokerAll(clusterName,
- brokerAddr,
- brokerName,
- brokerId,
- haServerAddr,
- topicConfigWrapper,
- filterServerList,
- oneway, timeoutMills,
- enableActingMaster,
- compressed,
- null,
- brokerIdentity);
+ brokerAddr,
+ brokerName,
+ brokerId,
+ haServerAddr,
+ topicConfigWrapper,
+ filterServerList,
+ oneway, timeoutMills,
+ enableActingMaster,
+ compressed,
+ null,
+ brokerIdentity);
}
/**
@@ -445,23 +445,23 @@ public class BrokerOuterAPI {
* @param filterServerList
* @param oneway
* @param timeoutMills
- * @param compressed default false
+ * @param compressed default false
* @return
*/
public List<RegisterBrokerResult> registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills,
- final boolean enableActingMaster,
- final boolean compressed,
- final Long heartbeatTimeoutMillis,
- final BrokerIdentity brokerIdentity) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final boolean oneway,
+ final int timeoutMills,
+ final boolean enableActingMaster,
+ final boolean compressed,
+ final Long heartbeatTimeoutMillis,
+ final BrokerIdentity brokerIdentity) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
@@ -518,13 +518,13 @@ public class BrokerOuterAPI {
}
private RegisterBrokerResult registerBroker(
- final String namesrvAddr,
- final boolean oneway,
- final int timeoutMills,
- final RegisterBrokerRequestHeader requestHeader,
- final byte[] body
+ final String namesrvAddr,
+ final boolean oneway,
+ final int timeoutMills,
+ final RegisterBrokerRequestHeader requestHeader,
+ final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
+ InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
@@ -542,7 +542,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
- (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+ (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -559,10 +559,10 @@ public class BrokerOuterAPI {
}
public void unregisterBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) {
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null) {
@@ -578,11 +578,11 @@ public class BrokerOuterAPI {
}
public void unregisterBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
+ final String namesrvAddr,
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
@@ -608,13 +608,13 @@ public class BrokerOuterAPI {
}
public List<Boolean> needRegister(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final int timeoutMills,
- final boolean isInBrokerContainer) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final int timeoutMills,
+ final boolean isInBrokerContainer) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
@@ -637,7 +637,7 @@ public class BrokerOuterAPI {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
- (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+ (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null) {
@@ -674,8 +674,8 @@ public class BrokerOuterAPI {
}
public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
- final String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -692,8 +692,8 @@ public class BrokerOuterAPI {
}
public TimerCheckpoint getTimerCheckPoint(
- final String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -710,8 +710,8 @@ public class BrokerOuterAPI {
}
public TimerMetrics.TimerMetricsSerializeWrapper getTimerMetrics(
- final String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
+ final String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -728,8 +728,8 @@ public class BrokerOuterAPI {
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
- final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ final String addr) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -745,8 +745,8 @@ public class BrokerOuterAPI {
}
public String getAllDelayOffset(
- final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+ final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -762,8 +762,8 @@ public class BrokerOuterAPI {
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
- final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ final String addr) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -787,8 +787,8 @@ public class BrokerOuterAPI {
}
public long getMaxOffset(final String addr, final String topic, final int queueId, final boolean committed,
- final boolean isOnlyThisBroker)
- throws RemotingException, MQBrokerException, InterruptedException {
+ final boolean isOnlyThisBroker)
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
@@ -811,7 +811,7 @@ public class BrokerOuterAPI {
}
public long getMinOffset(final String addr, final String topic, final int queueId, final boolean isOnlyThisBroker)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
@@ -833,10 +833,10 @@ public class BrokerOuterAPI {
}
public void lockBatchMQAsync(
- final String addr,
- final LockBatchRequestBody requestBody,
- final long timeoutMillis,
- final LockCallback callback) throws RemotingException, InterruptedException {
+ final String addr,
+ final LockBatchRequestBody requestBody,
+ final long timeoutMillis,
+ final LockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
@@ -850,7 +850,7 @@ public class BrokerOuterAPI {
if (response != null) {
if (response.getCode() == ResponseCode.SUCCESS) {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(),
- LockBatchResponseBody.class);
+ LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
callback.onSuccess(messageQueues);
} else {
@@ -864,10 +864,10 @@ public class BrokerOuterAPI {
}
public void unlockBatchMQAsync(
- final String addr,
- final UnlockBatchRequestBody requestBody,
- final long timeoutMillis,
- final UnlockCallback callback) throws RemotingException, InterruptedException {
+ final String addr,
+ final UnlockBatchRequestBody requestBody,
+ final long timeoutMillis,
+ final UnlockCallback callback) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
@@ -897,8 +897,8 @@ public class BrokerOuterAPI {
}
public SendResult sendMessageToSpecificBroker(String brokerAddr, final String brokerName,
- final MessageExt msg, String group,
- long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+ final MessageExt msg, String group,
+ long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = buildSendMessageRequest(msg, group);
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
@@ -906,8 +906,8 @@ public class BrokerOuterAPI {
}
public CompletableFuture<SendResult> sendMessageToSpecificBrokerAsync(String brokerAddr, final String brokerName,
- final MessageExt msg, String group,
- long timeoutMillis) {
+ final MessageExt msg, String group,
+ long timeoutMillis) {
RemotingCommand request = buildSendMessageRequest(msg, group);
CompletableFuture<SendResult> cf = new CompletableFuture<>();
@@ -963,9 +963,9 @@ public class BrokerOuterAPI {
}
private SendResult processSendResponse(
- final String brokerName,
- final Message msg,
- final RemotingCommand response
+ final String brokerName,
+ final Message msg,
+ final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -992,7 +992,7 @@ public class BrokerOuterAPI {
}
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
@@ -1008,8 +1008,8 @@ public class BrokerOuterAPI {
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
- uniqMsgId,
- responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+ uniqMsgId,
+ responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -1036,12 +1036,12 @@ public class BrokerOuterAPI {
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
- boolean allowTopicNotExist) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ boolean allowTopicNotExist) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1086,7 +1086,7 @@ public class BrokerOuterAPI {
}
public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis,
- InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
+ InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
@@ -1104,8 +1104,8 @@ public class BrokerOuterAPI {
}
public MessageRequestModeSerializeWrapper getAllMessageRequestMode(
- final String addr) throws RemotingSendRequestException, RemotingConnectException,
- MQBrokerException, RemotingTimeoutException, InterruptedException {
+ final String addr) throws RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
@@ -1134,10 +1134,10 @@ public class BrokerOuterAPI {
* Alter syncStateSet
*/
public SyncStateSet alterSyncStateSet(
- final String controllerAddress,
- final String brokerName,
- final String masterAddress, final int masterEpoch,
- final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
+ final String controllerAddress,
+ final String brokerName,
+ final String masterAddress, final int masterEpoch,
+ final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader);
@@ -1160,10 +1160,11 @@ public class BrokerOuterAPI {
* Register broker to controller
*/
public RegisterBrokerToControllerResponseHeader registerBrokerToController(
- final String controllerAddress, final String clusterName,
- final String brokerName, final String address, final int epoch, final long maxOffset, final int electionPriority) throws Exception {
+ final String controllerAddress, final String clusterName,
+ final String brokerName, final String address, final long controllerHeartbeatTimeoutMills, final int epoch,
+ final long maxOffset, final int electionPriority) throws Exception {
- final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset, electionPriority);
+ final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
@@ -1182,7 +1183,7 @@ public class BrokerOuterAPI {
* Get broker replica info
*/
public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final String controllerAddress,
- final String brokerName, final String brokerAddress) throws Exception {
+ final String brokerName, final String brokerAddress) throws Exception {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName, brokerAddress);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1208,15 +1209,17 @@ public class BrokerOuterAPI {
* Send heartbeat to controller
*/
public void sendHeartbeatToController(final String controllerAddress,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final Long brokerId,
- final int timeoutMills,
- final boolean isInBrokerContainer,
- final int epoch,
- final long maxOffset,
- final long confirmOffset) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final Long brokerId,
+ final int sendHeartBeatTimeoutMills,
+ final boolean isInBrokerContainer,
+ final int epoch,
+ final long maxOffset,
+ final long confirmOffset,
+ final long controllerHeartBeatTimeoutMills,
+ final int electionPriority) {
if (StringUtils.isEmpty(controllerAddress)) {
return;
}
@@ -1228,13 +1231,15 @@ public class BrokerOuterAPI {
requestHeader.setEpoch(epoch);
requestHeader.setMaxOffset(maxOffset);
requestHeader.setConfirmOffset(confirmOffset);
+ requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
+ requestHeader.setElectionPriority(electionPriority);
brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
@Override
public void run0() {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);
try {
- BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);
+ BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, sendHeartBeatTimeoutMills);
} catch (Exception e) {
LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);
}
@@ -1243,9 +1248,9 @@ public class BrokerOuterAPI {
}
public PullResult pullMessageFromSpecificBroker(String brokerName, String brokerAddr,
- String consumerGroup, String topic, int queueId, long offset,
- int maxNums,
- long timeoutMillis) throws MQBrokerException, RemotingException, InterruptedException {
+ String consumerGroup, String topic, int queueId, long offset,
+ int maxNums,
+ long timeoutMillis) throws MQBrokerException, RemotingException, InterruptedException {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
@@ -1270,8 +1275,8 @@ public class BrokerOuterAPI {
}
private PullResultExt processPullResponse(
- final RemotingCommand response,
- final String addr) throws MQBrokerException, RemotingCommandException {
+ final RemotingCommand response,
+ final String addr) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
@@ -1292,10 +1297,10 @@ public class BrokerOuterAPI {
}
PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
- responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
+ responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
}
@@ -1304,10 +1309,10 @@ public class BrokerOuterAPI {
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResult.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodesBatch(
- byteBuffer,
- true,
- true,
- true
+ byteBuffer,
+ true,
+ true,
+ true
);
// Currently batch messages are not supported
@@ -1317,9 +1322,9 @@ public class BrokerOuterAPI {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
- Long.toString(pullResult.getMinOffset()));
+ Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
- Long.toString(pullResult.getMaxOffset()));
+ Long.toString(pullResult.getMaxOffset()));
msg.setBrokerName(brokerName);
msg.setQueueId(queueId);
if (pullResult.getOffsetDelta() != null) {
@@ -1335,5 +1340,4 @@ public class BrokerOuterAPI {
return pullResult;
}
-
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 84e578db5..01eacf43b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,7 +123,7 @@ public class ReplicasManagerTest {
when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
- when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
+ when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
replicasManager = new ReplicasManager(brokerController);
autoSwitchHAService.init(defaultMessageStore);
@@ -145,7 +145,7 @@ public class ReplicasManagerTest {
.doesNotThrowAnyException();
// equal to localAddress
- Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH , SLAVE_BROKER_ID))
+ Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
.doesNotThrowAnyException();
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1dee9101b..8aa0d69b1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -332,6 +332,8 @@ public class BrokerConfig extends BrokerIdentity {
private long syncControllerMetadataPeriod = 10 * 1000;
+ private long controllerHeartBeatTimeoutMills = 10 * 1000;
+
private boolean validateSystemTopicWhenUpdateTopic = true;
/**
@@ -1443,6 +1445,14 @@ public class BrokerConfig extends BrokerIdentity {
this.brokerElectionPriority = brokerElectionPriority;
}
+ public long getControllerHeartBeatTimeoutMills() {
+ return controllerHeartBeatTimeoutMills;
+ }
+
+ public void setControllerHeartBeatTimeoutMills(long controllerHeartBeatTimeoutMills) {
+ this.controllerHeartBeatTimeoutMills = controllerHeartBeatTimeoutMills;
+ }
+
public boolean isRecoverConcurrently() {
return recoverConcurrently;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index fd41aa21a..7d9b78e8c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -23,12 +23,9 @@ public interface BrokerHeartbeatManager {
/**
* Broker new heartbeat.
*/
- void onBrokerHeartbeat(final String clusterName, final String brokerAddr, final Integer epoch, final Long maxOffset, final Long confirmOffset);
-
- /**
- * Change the metadata(brokerId ..) for a broker.
- */
- void changeBrokerMetadata(final String clusterName, final String brokerAddr, final Long brokerId);
+ void onBrokerHeartbeat(final String clusterName, final String brokerName, final String brokerAddr,
+ final Long brokerId, final Long timeoutMillis, final Channel channel, final Integer epoch,
+ final Long maxOffset, final Long confirmOffset, final Integer electionPriority);
/**
* Start heartbeat manager.
@@ -45,12 +42,6 @@ public interface BrokerHeartbeatManager {
*/
void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
- /**
- * Register new broker to heartManager.
- */
- void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
- final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);
-
/**
* Broker channel close
*/
@@ -70,6 +61,7 @@ public interface BrokerHeartbeatManager {
/**
* Trigger when broker inactive.
*/
- void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, final long brokerId);
+ void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress,
+ final long brokerId);
}
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index faaf298d2..eb33b98a6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -16,15 +16,13 @@
*/
package org.apache.rocketmq.controller;
-
import io.netty.channel.Channel;
-
public class BrokerLiveInfo {
private final String brokerName;
private final String brokerAddr;
- private final long heartbeatTimeoutMillis;
+ private long heartbeatTimeoutMillis;
private final Channel channel;
private long brokerId;
private long lastUpdateTimestamp;
@@ -33,8 +31,8 @@ public class BrokerLiveInfo {
private long confirmOffset;
private Integer electionPriority;
- public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset, Integer electionPriority) {
+ public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
+ long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -46,8 +44,8 @@ public class BrokerLiveInfo {
this.maxOffset = maxOffset;
}
- public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
- Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
+ public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
+ long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
this.brokerName = brokerName;
this.brokerAddr = brokerAddr;
this.brokerId = brokerId;
@@ -63,16 +61,16 @@ public class BrokerLiveInfo {
@Override
public String toString() {
return "BrokerLiveInfo{" +
- "brokerName='" + brokerName + '\'' +
- ", brokerAddr='" + brokerAddr + '\'' +
- ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
- ", channel=" + channel +
- ", brokerId=" + brokerId +
- ", lastUpdateTimestamp=" + lastUpdateTimestamp +
- ", epoch=" + epoch +
- ", maxOffset=" + maxOffset +
- ", confirmOffset=" + confirmOffset +
- '}';
+ "brokerName='" + brokerName + '\'' +
+ ", brokerAddr='" + brokerAddr + '\'' +
+ ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", channel=" + channel +
+ ", brokerId=" + brokerId +
+ ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+ ", epoch=" + epoch +
+ ", maxOffset=" + maxOffset +
+ ", confirmOffset=" + confirmOffset +
+ '}';
}
public String getBrokerName() {
@@ -83,6 +81,10 @@ public class BrokerLiveInfo {
return heartbeatTimeoutMillis;
}
+ public void setHeartbeatTimeoutMillis(long heartbeatTimeoutMillis) {
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+ }
+
public Channel getChannel() {
return channel;
}
@@ -138,4 +140,5 @@ public class BrokerLiveInfo {
public long getConfirmOffset() {
return confirmOffset;
}
+
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 23b8c6d56..a2c570817 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -48,6 +48,8 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
public class ControllerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -64,7 +66,7 @@ public class ControllerManager {
private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
- NettyClientConfig nettyClientConfig) {
+ NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
@@ -77,12 +79,12 @@ public class ControllerManager {
public boolean initialize() {
this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
this.controllerRequestExecutor = new ThreadPoolExecutor(
- this.controllerConfig.getControllerThreadPoolNums(),
- this.controllerConfig.getControllerThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.controllerRequestThreadPoolQueue,
- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+ this.controllerConfig.getControllerThreadPoolNums(),
+ this.controllerConfig.getControllerThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.controllerRequestThreadPoolQueue,
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
return new FutureTaskExt<>(runnable, value);
@@ -96,8 +98,8 @@ public class ControllerManager {
throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");
}
this.controller = new DLedgerController(this.controllerConfig, this.heartbeatManager::isBrokerActive,
- this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
- new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
+ this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
+ new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
// Register broker inactive listener
this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
@@ -106,34 +108,39 @@ public class ControllerManager {
}
/**
- * When the heartbeatManager detects the "Broker is not active",
- * we call this method to elect a master and do something else.
+ * When the heartbeatManager detects the "Broker is not active", we call this method to elect a master and do
+ * something else.
+ *
* @param clusterName The cluster name of this inactive broker
* @param brokerName The inactive broker name
* @param brokerAddress The inactive broker address(ip)
* @param brokerId The inactive broker id
*/
private void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) {
- if (brokerId == MixAll.MASTER_ID) {
- if (controller.isLeaderState()) {
- final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
- try {
- final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
- final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
- if (responseHeader != null) {
- log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
- if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
- heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
- }
- if (controllerConfig.isNotifyBrokerRoleChanged()) {
- notifyBrokerRoleChanged(responseHeader, clusterName);
- }
+ if (controller.isLeaderState()) {
+ try {
+ final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress));
+ final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS);
+ final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
+ // Not master broker offline
+ if (!replicaInfoResponseHeader.getMasterAddress().equals(brokerAddress)) {
+ log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
+ return;
+ }
+ final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+ final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
+ final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
+ if (responseHeader != null) {
+ log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
+ if (controllerConfig.isNotifyBrokerRoleChanged()) {
+ notifyBrokerRoleChanged(responseHeader, clusterName);
}
- } catch (Exception ignored) {
}
- } else {
- log.info("Broker{}' master shutdown", brokerName);
+ } catch (Exception e) {
+ log.error("", e);
}
+ } else {
+ log.info("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
}
}
@@ -161,11 +168,11 @@ public class ControllerManager {
}
public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
- final ElectMasterResponseHeader responseHeader) {
+ final ElectMasterResponseHeader responseHeader) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
- responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+ responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
try {
this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index eabae152b..2a5610c56 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -99,53 +99,40 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
this.brokerLifecycleListeners.add(listener);
}
- @Override
- public void registerBroker(String clusterName, String brokerName, String brokerAddr,
- long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, final Integer electionPriority) {
- final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
- final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
- new BrokerLiveInfo(brokerName,
- brokerAddr,
- brokerId,
- System.currentTimeMillis(),
- timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- channel,
- epoch == null ? -1 : epoch,
- maxOffset == null ? -1 : maxOffset,
- electionPriority == null ? Integer.MAX_VALUE : electionPriority));
- if (prevBrokerLiveInfo == null) {
- log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
- }
- }
-
- @Override
- public void changeBrokerMetadata(String clusterName, String brokerAddr, Long brokerId) {
+ @Override public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
+ Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
- if (prev != null) {
- prev.setBrokerId(brokerId);
- log.info("Change broker {}'s brokerId to {}", brokerAddr, brokerId);
- }
- }
-
- @Override
- public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset,
- Long confirmOffset) {
- BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
- BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
- if (null == prev) {
- return;
- }
int realEpoch = Optional.ofNullable(epoch).orElse(-1);
+ long realBrokerId = Optional.ofNullable(brokerId).orElse(-1L);
long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
long realConfirmOffset = Optional.ofNullable(confirmOffset).orElse(-1L);
-
- prev.setLastUpdateTimestamp(System.currentTimeMillis());
- if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) {
- prev.setEpoch(realEpoch);
- prev.setMaxOffset(realMaxOffset);
- prev.setConfirmOffset(realConfirmOffset);
+ long realTimeoutMillis = Optional.ofNullable(timeoutMillis).orElse(DEFAULT_BROKER_CHANNEL_EXPIRED_TIME);
+ int realElectionPriority = Optional.ofNullable(electionPriority).orElse(Integer.MAX_VALUE);
+ if (null == prev) {
+ this.brokerLiveTable.put(addrInfo,
+ new BrokerLiveInfo(brokerName,
+ brokerAddr,
+ realBrokerId,
+ System.currentTimeMillis(),
+ realTimeoutMillis,
+ channel,
+ realEpoch,
+ realMaxOffset,
+ realElectionPriority));
+ log.info("new broker registered, {}, brokerId:{}", addrInfo, realBrokerId);
+ } else {
+ prev.setLastUpdateTimestamp(System.currentTimeMillis());
+ prev.setHeartbeatTimeoutMillis(realTimeoutMillis);
+ prev.setElectionPriority(realElectionPriority);
+ prev.setBrokerId(realBrokerId);
+ if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) {
+ prev.setEpoch(realEpoch);
+ prev.setMaxOffset(realMaxOffset);
+ prev.setConfirmOffset(realConfirmOffset);
+ }
}
+
}
@Override
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 4cbc1140e..cdc4abee0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
@@ -95,9 +94,6 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
if (null != responseHeader) {
- if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
- heartbeatManager.changeBrokerMetadata(electMasterRequest.getClusterName(), responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
- }
if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
}
@@ -113,9 +109,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
- this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
- responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
- controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getElectionPriority());
+ this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+ responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+ controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
}
return response;
}
@@ -134,8 +130,8 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
}
case BROKER_HEARTBEAT: {
final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
- this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr(),
- requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset());
+ this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+ requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
}
case CONTROLLER_GET_SYNC_STATE_DATA: {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 41bcaa0bb..6cef5978c 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -128,9 +128,7 @@ public class ControllerManagerTest {
final String brokerName, final String address, final RemotingClient client,
final long heartbeatTimeoutMillis) throws Exception {
- final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address);
- // Timeout = 3000
- requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
+ final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, heartbeatTimeoutMillis, 1, 1000L, 0);
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
assert response != null;
@@ -173,8 +171,8 @@ public class ControllerManagerTest {
} catch (Exception e) {
e.printStackTrace();
}
- }, 0, 2000L, TimeUnit.MILLISECONDS);
- Boolean flag = await().atMost(Duration.ofSeconds(5)).until(() -> {
+ }, 0, 1000L, TimeUnit.MILLISECONDS);
+ Boolean flag = await().atMost(Duration.ofSeconds(10)).until(() -> {
final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
final RemotingCommand response = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index dd0c60a65..306acf5b6 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -43,7 +43,8 @@ public class DefaultBrokerHeartbeatManagerTest {
this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
latch.countDown();
});
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:7000", 1L,3000L, null,
+ 1, 1L,-1L, 0);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
this.heartbeatManager.shutdown();
}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 2158c3f06..d3b03dfe9 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -150,39 +150,39 @@ public class ReplicasInfoManagerTest {
}
public void mockHeartbeatDataMasterStillAlive() {
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
- 1, 3L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
+ 1, 1L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L, -1L, 0);
}
public void mockHeartbeatDataHigherEpoch() {
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 0, 3L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 0, 3L, -1L, 0);
}
public void mockHeartbeatDataHigherOffset() {
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 2L, 0);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 2L, -1L, 0);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L, -1L, 0);
}
public void mockHeartbeatDataHigherPriority() {
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
- 1, 3L, 3);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
- 1, 3L, 2);
- this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
- 1, 3L, 1);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+ 1, 3L, -1L, 3);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+ 1, 3L, -1L, 2);
+ this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+ 1, 3L, -1L, 1);
}
@Test
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
index a8e745e4f..bdcf59c55 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
@@ -29,6 +29,8 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
@CFNullable
private Long maxOffset;
@CFNullable
+ private Long confirmOffset;
+ @CFNullable
private Long heartbeatTimeoutMillis;
@CFNullable
private Integer electionPriority;
@@ -40,14 +42,17 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
this(clusterName, brokerName, brokerAddress, 0);
}
- public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int electionPriority) {
- this(clusterName, brokerName, brokerAddress, 0, 0, electionPriority);
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress,
+ int electionPriority) {
+ this(clusterName, brokerName, brokerAddress, null, 0, 0, electionPriority);
}
- public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset, int electionPriority) {
+ public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress,
+ Long heartbeatTimeoutMillis, int epoch, long maxOffset, int electionPriority) {
this.clusterName = clusterName;
this.brokerName = brokerName;
this.brokerAddress = brokerAddress;
+ this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
this.epoch = epoch;
this.maxOffset = maxOffset;
this.electionPriority = electionPriority;
@@ -96,14 +101,15 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
@Override
public String toString() {
return "RegisterBrokerToControllerRequestHeader{" +
- "clusterName='" + clusterName + '\'' +
- ", brokerName='" + brokerName + '\'' +
- ", brokerAddress='" + brokerAddress + '\'' +
- ", epoch=" + epoch +
- ", maxOffset=" + maxOffset +
- ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
- ", electionPriority=" + electionPriority +
- '}';
+ "clusterName='" + clusterName + '\'' +
+ ", brokerName='" + brokerName + '\'' +
+ ", brokerAddress='" + brokerAddress + '\'' +
+ ", epoch=" + epoch +
+ ", maxOffset=" + maxOffset +
+ ", confirmOffset=" + confirmOffset +
+ ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+ ", electionPriority=" + electionPriority +
+ '}';
}
public Integer getEpoch() {
@@ -122,6 +128,14 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
this.maxOffset = maxOffset;
}
+ public Long getConfirmOffset() {
+ return confirmOffset;
+ }
+
+ public void setConfirmOffset(Long confirmOffset) {
+ this.confirmOffset = confirmOffset;
+ }
+
@Override
public void checkFields() throws RemotingCommandException {
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
index 6b15b9be5..eb7332fdf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
@@ -30,11 +30,17 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
@CFNotNull
private String brokerName;
@CFNullable
+ private Long brokerId;
+ @CFNullable
private Integer epoch;
@CFNullable
private Long maxOffset;
@CFNullable
private Long confirmOffset;
+ @CFNullable
+ private Long heartbeatTimeoutMills;
+ @CFNullable
+ private Integer electionPriority;
@Override
public void checkFields() throws RemotingCommandException {
@@ -88,4 +94,28 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
public void setConfirmOffset(Long confirmOffset) {
this.confirmOffset = confirmOffset;
}
+
+ public Long getBrokerId() {
+ return brokerId;
+ }
+
+ public void setBrokerId(Long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+ public Long getHeartbeatTimeoutMills() {
+ return heartbeatTimeoutMills;
+ }
+
+ public void setHeartbeatTimeoutMills(Long heartbeatTimeoutMills) {
+ this.heartbeatTimeoutMills = heartbeatTimeoutMills;
+ }
+
+ public Integer getElectionPriority() {
+ return electionPriority;
+ }
+
+ public void setElectionPriority(Integer electionPriority) {
+ this.electionPriority = electionPriority;
+ }
}