You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/21 02:37:05 UTC

[rocketmq] branch develop updated: [ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 646d04f42 [ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)
646d04f42 is described below

commit 646d04f42524636ad09364af2a67fa8ab8b6b8fa
Author: rongtong <ji...@163.com>
AuthorDate: Wed Dec 21 10:36:57 2022 +0800

    [ISSUE #5712] Fix the invalid of heartbeat detection after controller switch (#5711)
    
    * Fix the invalid of heartbeat detection after controller switch
    
    * Pass the checkstyle
    
    * Format ReplicasInfoManagerTest code style
---
 .../apache/rocketmq/broker/BrokerController.java   |   4 +-
 .../broker/controller/ReplicasManager.java         |   2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 318 +++++++++++----------
 .../broker/controller/ReplicasManagerTest.java     |   4 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../controller/BrokerHeartbeatManager.java         |  18 +-
 .../apache/rocketmq/controller/BrokerLiveInfo.java |  37 +--
 .../rocketmq/controller/ControllerManager.java     |  67 +++--
 .../impl/DefaultBrokerHeartbeatManager.java        |  69 ++---
 .../processor/ControllerRequestProcessor.java      |  14 +-
 .../impl/controller/ControllerManagerTest.java     |   8 +-
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |   3 +-
 .../impl/manager/ReplicasInfoManagerTest.java      |  48 ++--
 .../RegisterBrokerToControllerRequestHeader.java   |  36 ++-
 .../namesrv/BrokerHeartbeatRequestHeader.java      |  30 ++
 15 files changed, 356 insertions(+), 312 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 0a581b0c6..7804dfc78 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1747,7 +1747,9 @@ public class BrokerController {
                         this.brokerConfig.getSendHeartbeatTimeoutMillis(),
                         this.brokerConfig.isInBrokerContainer(), this.replicasManager.getLastEpoch(),
                         this.messageStore.getMaxPhyOffset(),
-                        this.replicasManager.getConfirmOffset()
+                        this.replicasManager.getConfirmOffset(),
+                        this.brokerConfig.getControllerHeartBeatTimeoutMills(),
+                        this.brokerConfig.getBrokerElectionPriority()
                     );
                 }
             }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index a6589d2ea..a0218f8cc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -291,7 +291,7 @@ public class ReplicasManager {
         // Register this broker to controller, get brokerId and masterAddress.
         try {
             final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-                this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress,
+                this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
                 this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
             final String newMasterAddress = registerResponse.getMasterAddress();
             if (StringUtils.isNoneEmpty(newMasterAddress)) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 8d1690874..200b5e779 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -137,7 +137,7 @@ public class BrokerOuterAPI {
     private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
-            new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
+        new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     private ClientMetadata clientMetadata;
     private RpcClient rpcClient;
@@ -186,7 +186,7 @@ public class BrokerOuterAPI {
     private List<String> lookupNameServerAddress(String domain) {
         List<String> addressList = new ArrayList<>();
         try {
-            java.security.Security.setProperty("networkaddress.cache.ttl" , "10");
+            java.security.Security.setProperty("networkaddress.cache.ttl", "10");
             int index = domain.indexOf(":");
             String portStr = domain.substring(index);
             String domainStr = domain.substring(0, index);
@@ -213,13 +213,13 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String brokerName)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         return syncBrokerMemberGroup(clusterName, brokerName, false);
     }
 
     public BrokerMemberGroup syncBrokerMemberGroup(String clusterName, String brokerName,
-                                                   boolean isCompatibleWithOldNameSrv)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        boolean isCompatibleWithOldNameSrv)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         if (isCompatibleWithOldNameSrv) {
             return getBrokerMemberGroupCompatible(clusterName, brokerName);
         } else {
@@ -228,7 +228,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String brokerName)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(clusterName, brokerName);
 
         GetBrokerMemberGroupRequestHeader requestHeader = new GetBrokerMemberGroupRequestHeader();
@@ -246,7 +246,7 @@ public class BrokerOuterAPI {
                 byte[] body = response.getBody();
                 if (body != null) {
                     GetBrokerMemberGroupResponseBody brokerMemberGroupResponseBody =
-                            GetBrokerMemberGroupResponseBody.decode(body, GetBrokerMemberGroupResponseBody.class);
+                        GetBrokerMemberGroupResponseBody.decode(body, GetBrokerMemberGroupResponseBody.class);
 
                     return brokerMemberGroupResponseBody.getBrokerMemberGroup();
                 }
@@ -259,7 +259,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerMemberGroup getBrokerMemberGroupCompatible(String clusterName, String brokerName)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         BrokerMemberGroup brokerMemberGroup = new BrokerMemberGroup(clusterName, brokerName);
 
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
@@ -278,8 +278,8 @@ public class BrokerOuterAPI {
                     TopicRouteData topicRouteData = TopicRouteData.decode(body, TopicRouteData.class);
                     for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
                         if (brokerData != null
-                                && brokerData.getBrokerName().equals(brokerName)
-                                && brokerData.getCluster().equals(clusterName)) {
+                            && brokerData.getBrokerName().equals(brokerName)
+                            && brokerData.getCluster().equals(clusterName)) {
                             brokerMemberGroup.getBrokerAddrs().putAll(brokerData.getBrokerAddrs());
                             break;
                         }
@@ -295,13 +295,13 @@ public class BrokerOuterAPI {
     }
 
     public void sendHeartbeatViaDataVersion(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final Long brokerId,
-            final int timeoutMillis,
-            final DataVersion dataVersion,
-            final boolean isInBrokerContainer) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final Long brokerId,
+        final int timeoutMillis,
+        final DataVersion dataVersion,
+        final boolean isInBrokerContainer) {
         List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
             final QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
@@ -330,11 +330,11 @@ public class BrokerOuterAPI {
     }
 
     public void sendHeartbeat(final String clusterName,
-                              final String brokerAddr,
-                              final String brokerName,
-                              final Long brokerId,
-                              final int timeoutMills,
-                              final boolean isInBrokerContainer) {
+        final String brokerAddr,
+        final String brokerName,
+        final Long brokerId,
+        final int timeoutMills,
+        final boolean isInBrokerContainer) {
         List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
 
         final BrokerHeartbeatRequestHeader requestHeader = new BrokerHeartbeatRequestHeader();
@@ -361,8 +361,8 @@ public class BrokerOuterAPI {
     }
 
     public BrokerSyncInfo retrieveBrokerHaInfo(String masterBrokerAddr)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException, RemotingCommandException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException, RemotingCommandException {
         ExchangeHAInfoRequestHeader requestHeader = new ExchangeHAInfoRequestHeader();
         requestHeader.setMasterHaAddress(null);
 
@@ -383,7 +383,7 @@ public class BrokerOuterAPI {
     }
 
     public void sendBrokerHaInfo(String brokerAddr, String masterHaAddr, long brokerInitMaxOffset, String masterAddr)
-            throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         ExchangeHAInfoRequestHeader requestHeader = new ExchangeHAInfoRequestHeader();
         requestHeader.setMasterHaAddress(masterHaAddr);
         requestHeader.setMasterFlushOffset(brokerInitMaxOffset);
@@ -406,30 +406,30 @@ public class BrokerOuterAPI {
     }
 
     public List<RegisterBrokerResult> registerBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills,
-            final boolean enableActingMaster,
-            final boolean compressed,
-            final BrokerIdentity brokerIdentity) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final String haServerAddr,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final List<String> filterServerList,
+        final boolean oneway,
+        final int timeoutMills,
+        final boolean enableActingMaster,
+        final boolean compressed,
+        final BrokerIdentity brokerIdentity) {
         return registerBrokerAll(clusterName,
-                brokerAddr,
-                brokerName,
-                brokerId,
-                haServerAddr,
-                topicConfigWrapper,
-                filterServerList,
-                oneway, timeoutMills,
-                enableActingMaster,
-                compressed,
-                null,
-                brokerIdentity);
+            brokerAddr,
+            brokerName,
+            brokerId,
+            haServerAddr,
+            topicConfigWrapper,
+            filterServerList,
+            oneway, timeoutMills,
+            enableActingMaster,
+            compressed,
+            null,
+            brokerIdentity);
     }
 
     /**
@@ -445,23 +445,23 @@ public class BrokerOuterAPI {
      * @param filterServerList
      * @param oneway
      * @param timeoutMills
-     * @param compressed         default false
+     * @param compressed default false
      * @return
      */
     public List<RegisterBrokerResult> registerBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills,
-            final boolean enableActingMaster,
-            final boolean compressed,
-            final Long heartbeatTimeoutMillis,
-            final BrokerIdentity brokerIdentity) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final String haServerAddr,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final List<String> filterServerList,
+        final boolean oneway,
+        final int timeoutMills,
+        final boolean enableActingMaster,
+        final boolean compressed,
+        final Long heartbeatTimeoutMillis,
+        final BrokerIdentity brokerIdentity) {
 
         final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
         List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
@@ -518,13 +518,13 @@ public class BrokerOuterAPI {
     }
 
     private RegisterBrokerResult registerBroker(
-            final String namesrvAddr,
-            final boolean oneway,
-            final int timeoutMills,
-            final RegisterBrokerRequestHeader requestHeader,
-            final byte[] body
+        final String namesrvAddr,
+        final boolean oneway,
+        final int timeoutMills,
+        final RegisterBrokerRequestHeader requestHeader,
+        final byte[] body
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
+        InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
         request.setBody(body);
 
@@ -542,7 +542,7 @@ public class BrokerOuterAPI {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 RegisterBrokerResponseHeader responseHeader =
-                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                 RegisterBrokerResult result = new RegisterBrokerResult();
                 result.setMasterAddr(responseHeader.getMasterAddr());
                 result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -559,10 +559,10 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId
     ) {
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null) {
@@ -578,11 +578,11 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBroker(
-            final String namesrvAddr,
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
+        final String namesrvAddr,
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId
     ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
         UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
         requestHeader.setBrokerAddr(brokerAddr);
@@ -608,13 +608,13 @@ public class BrokerOuterAPI {
     }
 
     public List<Boolean> needRegister(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final int timeoutMills,
-            final boolean isInBrokerContainer) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final int timeoutMills,
+        final boolean isInBrokerContainer) {
         final List<Boolean> changedList = new CopyOnWriteArrayList<>();
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
@@ -637,7 +637,7 @@ public class BrokerOuterAPI {
                             switch (response.getCode()) {
                                 case ResponseCode.SUCCESS: {
                                     QueryDataVersionResponseHeader queryDataVersionResponseHeader =
-                                            (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                     changed = queryDataVersionResponseHeader.getChanged();
                                     byte[] body = response.getBody();
                                     if (body != null) {
@@ -674,8 +674,8 @@ public class BrokerOuterAPI {
     }
 
     public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
-            final String addr) throws RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        final String addr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -692,8 +692,8 @@ public class BrokerOuterAPI {
     }
 
     public TimerCheckpoint getTimerCheckPoint(
-            final String addr) throws RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        final String addr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -710,8 +710,8 @@ public class BrokerOuterAPI {
     }
 
     public TimerMetrics.TimerMetricsSerializeWrapper getTimerMetrics(
-            final String addr) throws RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        final String addr) throws RemotingConnectException, RemotingSendRequestException,
+        RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -728,8 +728,8 @@ public class BrokerOuterAPI {
     }
 
     public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
-            final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        final String addr) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -745,8 +745,8 @@ public class BrokerOuterAPI {
     }
 
     public String getAllDelayOffset(
-            final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
-            RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -762,8 +762,8 @@ public class BrokerOuterAPI {
     }
 
     public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
-            final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        final String addr) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -787,8 +787,8 @@ public class BrokerOuterAPI {
     }
 
     public long getMaxOffset(final String addr, final String topic, final int queueId, final boolean committed,
-                             final boolean isOnlyThisBroker)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        final boolean isOnlyThisBroker)
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -811,7 +811,7 @@ public class BrokerOuterAPI {
     }
 
     public long getMinOffset(final String addr, final String topic, final int queueId, final boolean isOnlyThisBroker)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -833,10 +833,10 @@ public class BrokerOuterAPI {
     }
 
     public void lockBatchMQAsync(
-            final String addr,
-            final LockBatchRequestBody requestBody,
-            final long timeoutMillis,
-            final LockCallback callback) throws RemotingException, InterruptedException {
+        final String addr,
+        final LockBatchRequestBody requestBody,
+        final long timeoutMillis,
+        final LockCallback callback) throws RemotingException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
@@ -850,7 +850,7 @@ public class BrokerOuterAPI {
                 if (response != null) {
                     if (response.getCode() == ResponseCode.SUCCESS) {
                         LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(),
-                                LockBatchResponseBody.class);
+                            LockBatchResponseBody.class);
                         Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
                         callback.onSuccess(messageQueues);
                     } else {
@@ -864,10 +864,10 @@ public class BrokerOuterAPI {
     }
 
     public void unlockBatchMQAsync(
-            final String addr,
-            final UnlockBatchRequestBody requestBody,
-            final long timeoutMillis,
-            final UnlockCallback callback) throws RemotingException, InterruptedException {
+        final String addr,
+        final UnlockBatchRequestBody requestBody,
+        final long timeoutMillis,
+        final UnlockCallback callback) throws RemotingException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
@@ -897,8 +897,8 @@ public class BrokerOuterAPI {
     }
 
     public SendResult sendMessageToSpecificBroker(String brokerAddr, final String brokerName,
-                                                  final MessageExt msg, String group,
-                                                  long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+        final MessageExt msg, String group,
+        long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
 
         RemotingCommand request = buildSendMessageRequest(msg, group);
         RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
@@ -906,8 +906,8 @@ public class BrokerOuterAPI {
     }
 
     public CompletableFuture<SendResult> sendMessageToSpecificBrokerAsync(String brokerAddr, final String brokerName,
-                                                                          final MessageExt msg, String group,
-                                                                          long timeoutMillis) {
+        final MessageExt msg, String group,
+        long timeoutMillis) {
         RemotingCommand request = buildSendMessageRequest(msg, group);
 
         CompletableFuture<SendResult> cf = new CompletableFuture<>();
@@ -963,9 +963,9 @@ public class BrokerOuterAPI {
     }
 
     private SendResult processSendResponse(
-            final String brokerName,
-            final Message msg,
-            final RemotingCommand response
+        final String brokerName,
+        final Message msg,
+        final RemotingCommand response
     ) throws MQBrokerException, RemotingCommandException {
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -992,7 +992,7 @@ public class BrokerOuterAPI {
                 }
 
                 SendMessageResponseHeader responseHeader =
-                        (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                    (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
                 //If namespace not null , reset Topic without namespace.
                 String topic = msg.getTopic();
@@ -1008,8 +1008,8 @@ public class BrokerOuterAPI {
                     uniqMsgId = sb.toString();
                 }
                 SendResult sendResult = new SendResult(sendStatus,
-                        uniqMsgId,
-                        responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+                    uniqMsgId,
+                    responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
                 sendResult.setTransactionId(responseHeader.getTransactionId());
                 String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
                 String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -1036,12 +1036,12 @@ public class BrokerOuterAPI {
     }
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
     }
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
-                                                          boolean allowTopicNotExist) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        boolean allowTopicNotExist) throws MQBrokerException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1086,7 +1086,7 @@ public class BrokerOuterAPI {
     }
 
     public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis,
-                               InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
+        InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
     }
 
@@ -1104,8 +1104,8 @@ public class BrokerOuterAPI {
     }
 
     public MessageRequestModeSerializeWrapper getAllMessageRequestMode(
-            final String addr) throws RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException, RemotingTimeoutException, InterruptedException {
+        final String addr) throws RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -1134,10 +1134,10 @@ public class BrokerOuterAPI {
      * Alter syncStateSet
      */
     public SyncStateSet alterSyncStateSet(
-            final String controllerAddress,
-            final String brokerName,
-            final String masterAddress, final int masterEpoch,
-            final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
+        final String controllerAddress,
+        final String brokerName,
+        final String masterAddress, final int masterEpoch,
+        final Set<String> newSyncStateSet, final int syncStateSetEpoch) throws Exception {
 
         final AlterSyncStateSetRequestHeader requestHeader = new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, requestHeader);
@@ -1160,10 +1160,11 @@ public class BrokerOuterAPI {
      * Register broker to controller
      */
     public RegisterBrokerToControllerResponseHeader registerBrokerToController(
-            final String controllerAddress, final String clusterName,
-            final String brokerName, final String address, final int epoch, final long maxOffset, final int electionPriority) throws Exception {
+        final String controllerAddress, final String clusterName,
+        final String brokerName, final String address, final long controllerHeartbeatTimeoutMills, final int epoch,
+        final long maxOffset, final int electionPriority) throws Exception {
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, epoch, maxOffset, electionPriority);
+        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, controllerHeartbeatTimeoutMills, epoch, maxOffset, electionPriority);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
         final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
         assert response != null;
@@ -1182,7 +1183,7 @@ public class BrokerOuterAPI {
      * Get broker replica info
      */
     public Pair<GetReplicaInfoResponseHeader, SyncStateSet> getReplicaInfo(final String controllerAddress,
-                                                                           final String brokerName, final String brokerAddress) throws Exception {
+        final String brokerName, final String brokerAddress) throws Exception {
         final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader(brokerName, brokerAddress);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
         final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
@@ -1208,15 +1209,17 @@ public class BrokerOuterAPI {
      * Send heartbeat to controller
      */
     public void sendHeartbeatToController(final String controllerAddress,
-                                          final String clusterName,
-                                          final String brokerAddr,
-                                          final String brokerName,
-                                          final Long brokerId,
-                                          final int timeoutMills,
-                                          final boolean isInBrokerContainer,
-                                          final int epoch,
-                                          final long maxOffset,
-                                          final long confirmOffset) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final Long brokerId,
+        final int sendHeartBeatTimeoutMills,
+        final boolean isInBrokerContainer,
+        final int epoch,
+        final long maxOffset,
+        final long confirmOffset,
+        final long controllerHeartBeatTimeoutMills,
+        final int electionPriority) {
         if (StringUtils.isEmpty(controllerAddress)) {
             return;
         }
@@ -1228,13 +1231,15 @@ public class BrokerOuterAPI {
         requestHeader.setEpoch(epoch);
         requestHeader.setMaxOffset(maxOffset);
         requestHeader.setConfirmOffset(confirmOffset);
+        requestHeader.setHeartbeatTimeoutMills(controllerHeartBeatTimeoutMills);
+        requestHeader.setElectionPriority(electionPriority);
         brokerOuterExecutor.execute(new AbstractBrokerRunnable(new BrokerIdentity(clusterName, brokerName, brokerId, isInBrokerContainer)) {
             @Override
             public void run0() {
                 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, requestHeader);
 
                 try {
-                    BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, timeoutMills);
+                    BrokerOuterAPI.this.remotingClient.invokeOneway(controllerAddress, request, sendHeartBeatTimeoutMills);
                 } catch (Exception e) {
                     LOGGER.error("Error happen when send heartbeat to controller {}", controllerAddress, e);
                 }
@@ -1243,9 +1248,9 @@ public class BrokerOuterAPI {
     }
 
     public PullResult pullMessageFromSpecificBroker(String brokerName, String brokerAddr,
-                                                    String consumerGroup, String topic, int queueId, long offset,
-                                                    int maxNums,
-                                                    long timeoutMillis) throws MQBrokerException, RemotingException, InterruptedException {
+        String consumerGroup, String topic, int queueId, long offset,
+        int maxNums,
+        long timeoutMillis) throws MQBrokerException, RemotingException, InterruptedException {
 
         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
@@ -1270,8 +1275,8 @@ public class BrokerOuterAPI {
     }
 
     private PullResultExt processPullResponse(
-            final RemotingCommand response,
-            final String addr) throws MQBrokerException, RemotingCommandException {
+        final RemotingCommand response,
+        final String addr) throws MQBrokerException, RemotingCommandException {
         PullStatus pullStatus = PullStatus.NO_NEW_MSG;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS:
@@ -1292,10 +1297,10 @@ public class BrokerOuterAPI {
         }
 
         PullMessageResponseHeader responseHeader =
-                (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+            (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
 
         return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
-                responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
+            responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody(), responseHeader.getOffsetDelta());
 
     }
 
@@ -1304,10 +1309,10 @@ public class BrokerOuterAPI {
         if (PullStatus.FOUND == pullResult.getPullStatus()) {
             ByteBuffer byteBuffer = ByteBuffer.wrap(pullResult.getMessageBinary());
             List<MessageExt> msgList = MessageDecoder.decodesBatch(
-                    byteBuffer,
-                    true,
-                    true,
-                    true
+                byteBuffer,
+                true,
+                true,
+                true
             );
 
             // Currently batch messages are not supported
@@ -1317,9 +1322,9 @@ public class BrokerOuterAPI {
                     msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
                 }
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
-                        Long.toString(pullResult.getMinOffset()));
+                    Long.toString(pullResult.getMinOffset()));
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
-                        Long.toString(pullResult.getMaxOffset()));
+                    Long.toString(pullResult.getMaxOffset()));
                 msg.setBrokerName(brokerName);
                 msg.setQueueId(queueId);
                 if (pullResult.getOffsetDelta() != null) {
@@ -1335,5 +1340,4 @@ public class BrokerOuterAPI {
         return pullResult;
     }
 
-
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 84e578db5..01eacf43b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -123,7 +123,7 @@ public class ReplicasManagerTest {
         when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
         when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
         when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
-        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
         replicasManager = new ReplicasManager(brokerController);
         autoSwitchHAService.init(defaultMessageStore);
@@ -145,7 +145,7 @@ public class ReplicasManagerTest {
             .doesNotThrowAnyException();
 
         // equal to localAddress
-        Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH , SLAVE_BROKER_ID))
+        Assertions.assertThatCode(() -> replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
             .doesNotThrowAnyException();
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1dee9101b..8aa0d69b1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -332,6 +332,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private long syncControllerMetadataPeriod = 10 * 1000;
 
+    private long controllerHeartBeatTimeoutMills = 10 * 1000;
+
     private boolean validateSystemTopicWhenUpdateTopic = true;
 
     /**
@@ -1443,6 +1445,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.brokerElectionPriority = brokerElectionPriority;
     }
 
+    public long getControllerHeartBeatTimeoutMills() {
+        return controllerHeartBeatTimeoutMills;
+    }
+
+    public void setControllerHeartBeatTimeoutMills(long controllerHeartBeatTimeoutMills) {
+        this.controllerHeartBeatTimeoutMills = controllerHeartBeatTimeoutMills;
+    }
+
     public boolean isRecoverConcurrently() {
         return recoverConcurrently;
     }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index fd41aa21a..7d9b78e8c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -23,12 +23,9 @@ public interface BrokerHeartbeatManager {
     /**
      * Broker new heartbeat.
      */
-    void onBrokerHeartbeat(final String clusterName, final String brokerAddr, final Integer epoch, final Long maxOffset, final Long confirmOffset);
-
-    /**
-     * Change the metadata(brokerId ..) for a broker.
-     */
-    void changeBrokerMetadata(final String clusterName, final String brokerAddr, final Long brokerId);
+    void onBrokerHeartbeat(final String clusterName, final String brokerName, final String brokerAddr,
+        final Long brokerId, final Long timeoutMillis, final Channel channel, final Integer epoch,
+        final Long maxOffset, final Long confirmOffset, final Integer electionPriority);
 
     /**
      * Start heartbeat manager.
@@ -45,12 +42,6 @@ public interface BrokerHeartbeatManager {
      */
     void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
 
-    /**
-     * Register new broker to heartManager.
-     */
-    void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
-                        final Long timeoutMillis, final Channel channel, final Integer epoch, final Long maxOffset, final Integer electionPriority);
-
     /**
      * Broker channel close
      */
@@ -70,6 +61,7 @@ public interface BrokerHeartbeatManager {
         /**
          * Trigger when broker inactive.
          */
-        void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, final long brokerId);
+        void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress,
+            final long brokerId);
     }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
index faaf298d2..eb33b98a6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerLiveInfo.java
@@ -16,15 +16,13 @@
  */
 package org.apache.rocketmq.controller;
 
-
 import io.netty.channel.Channel;
 
-
 public class BrokerLiveInfo {
     private final String brokerName;
 
     private final String brokerAddr;
-    private final long heartbeatTimeoutMillis;
+    private long heartbeatTimeoutMillis;
     private final Channel channel;
     private long brokerId;
     private long lastUpdateTimestamp;
@@ -33,8 +31,8 @@ public class BrokerLiveInfo {
     private long confirmOffset;
     private Integer electionPriority;
 
-    public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
-                          Channel channel, int epoch, long maxOffset, Integer electionPriority) {
+    public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
+        long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority) {
         this.brokerName = brokerName;
         this.brokerAddr = brokerAddr;
         this.brokerId = brokerId;
@@ -46,8 +44,8 @@ public class BrokerLiveInfo {
         this.maxOffset = maxOffset;
     }
 
-    public BrokerLiveInfo(String brokerName, String brokerAddr,long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
-                          Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
+    public BrokerLiveInfo(String brokerName, String brokerAddr, long brokerId, long lastUpdateTimestamp,
+        long heartbeatTimeoutMillis, Channel channel, int epoch, long maxOffset, Integer electionPriority, long confirmOffset) {
         this.brokerName = brokerName;
         this.brokerAddr = brokerAddr;
         this.brokerId = brokerId;
@@ -63,16 +61,16 @@ public class BrokerLiveInfo {
     @Override
     public String toString() {
         return "BrokerLiveInfo{" +
-                "brokerName='" + brokerName + '\'' +
-                ", brokerAddr='" + brokerAddr + '\'' +
-                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
-                ", channel=" + channel +
-                ", brokerId=" + brokerId +
-                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
-                ", epoch=" + epoch +
-                ", maxOffset=" + maxOffset +
-                ", confirmOffset=" + confirmOffset +
-                '}';
+            "brokerName='" + brokerName + '\'' +
+            ", brokerAddr='" + brokerAddr + '\'' +
+            ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+            ", channel=" + channel +
+            ", brokerId=" + brokerId +
+            ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+            ", epoch=" + epoch +
+            ", maxOffset=" + maxOffset +
+            ", confirmOffset=" + confirmOffset +
+            '}';
     }
 
     public String getBrokerName() {
@@ -83,6 +81,10 @@ public class BrokerLiveInfo {
         return heartbeatTimeoutMillis;
     }
 
+    public void setHeartbeatTimeoutMillis(long heartbeatTimeoutMillis) {
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+    }
+
     public Channel getChannel() {
         return channel;
     }
@@ -138,4 +140,5 @@ public class BrokerLiveInfo {
     public long getConfirmOffset() {
         return confirmOffset;
     }
+
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 23b8c6d56..a2c570817 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -48,6 +48,8 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
 
 public class ControllerManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -64,7 +66,7 @@ public class ControllerManager {
     private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
     public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
-                             NettyClientConfig nettyClientConfig) {
+        NettyClientConfig nettyClientConfig) {
         this.controllerConfig = controllerConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.nettyClientConfig = nettyClientConfig;
@@ -77,12 +79,12 @@ public class ControllerManager {
     public boolean initialize() {
         this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
         this.controllerRequestExecutor = new ThreadPoolExecutor(
-                this.controllerConfig.getControllerThreadPoolNums(),
-                this.controllerConfig.getControllerThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.controllerRequestThreadPoolQueue,
-                new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+            this.controllerConfig.getControllerThreadPoolNums(),
+            this.controllerConfig.getControllerThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            this.controllerRequestThreadPoolQueue,
+            new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
             @Override
             protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
                 return new FutureTaskExt<>(runnable, value);
@@ -96,8 +98,8 @@ public class ControllerManager {
             throw new IllegalArgumentException("Attribute value controllerDLegerSelfId of ControllerConfig is null or empty");
         }
         this.controller = new DLedgerController(this.controllerConfig, this.heartbeatManager::isBrokerActive,
-                this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
-                new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
+            this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService,
+            new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo));
 
         // Register broker inactive listener
         this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
@@ -106,34 +108,39 @@ public class ControllerManager {
     }
 
     /**
-     * When the heartbeatManager detects the "Broker is not active",
-     * we call this method to elect a master and do something else.
+     * When the heartbeatManager detects the "Broker is not active", we call this method to elect a master and do
+     * something else.
+     *
      * @param clusterName The cluster name of this inactive broker
      * @param brokerName The inactive broker name
      * @param brokerAddress The inactive broker address(ip)
      * @param brokerId The inactive broker id
      */
     private void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) {
-        if (brokerId == MixAll.MASTER_ID) {
-            if (controller.isLeaderState()) {
-                final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
-                try {
-                    final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-                    if (responseHeader != null) {
-                        log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
-                        if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
-                            heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
-                        }
-                        if (controllerConfig.isNotifyBrokerRoleChanged()) {
-                            notifyBrokerRoleChanged(responseHeader, clusterName);
-                        }
+        if (controller.isLeaderState()) {
+            try {
+                final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress));
+                final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS);
+                final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
+                // Not master broker offline
+                if (!replicaInfoResponseHeader.getMasterAddress().equals(brokerAddress)) {
+                    log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
+                    return;
+                }
+                final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+                final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
+                final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
+                if (responseHeader != null) {
+                    log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
+                    if (controllerConfig.isNotifyBrokerRoleChanged()) {
+                        notifyBrokerRoleChanged(responseHeader, clusterName);
                     }
-                } catch (Exception ignored) {
                 }
-            } else {
-                log.info("Broker{}' master shutdown", brokerName);
+            } catch (Exception e) {
+                log.error("", e);
             }
+        } else {
+            log.info("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
         }
     }
 
@@ -161,11 +168,11 @@ public class ControllerManager {
     }
 
     public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
-                                          final ElectMasterResponseHeader responseHeader) {
+        final ElectMasterResponseHeader responseHeader) {
         if (StringUtils.isNoneEmpty(brokerAddr)) {
             log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
             final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
-                    responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+                responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
             final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
             try {
                 this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index eabae152b..2a5610c56 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -99,53 +99,40 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
         this.brokerLifecycleListeners.add(listener);
     }
 
-    @Override
-    public void registerBroker(String clusterName, String brokerName, String brokerAddr,
-        long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, final Integer electionPriority) {
-        final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
-        final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
-            new BrokerLiveInfo(brokerName,
-                brokerAddr,
-                brokerId,
-                System.currentTimeMillis(),
-                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
-                channel,
-                epoch == null ? -1 : epoch,
-                maxOffset == null ? -1 : maxOffset,
-                electionPriority == null ? Integer.MAX_VALUE : electionPriority));
-        if (prevBrokerLiveInfo == null) {
-            log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
-        }
-    }
-
-    @Override
-    public void changeBrokerMetadata(String clusterName, String brokerAddr, Long brokerId) {
+    @Override public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
+        Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
         BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
         BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
-        if (prev != null) {
-            prev.setBrokerId(brokerId);
-            log.info("Change broker {}'s brokerId to {}", brokerAddr, brokerId);
-        }
-    }
-
-    @Override
-    public void onBrokerHeartbeat(String clusterName, String brokerAddr, Integer epoch, Long maxOffset,
-        Long confirmOffset) {
-        BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
-        BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
-        if (null == prev) {
-            return;
-        }
         int realEpoch = Optional.ofNullable(epoch).orElse(-1);
+        long realBrokerId = Optional.ofNullable(brokerId).orElse(-1L);
         long realMaxOffset = Optional.ofNullable(maxOffset).orElse(-1L);
         long realConfirmOffset = Optional.ofNullable(confirmOffset).orElse(-1L);
-
-        prev.setLastUpdateTimestamp(System.currentTimeMillis());
-        if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) {
-            prev.setEpoch(realEpoch);
-            prev.setMaxOffset(realMaxOffset);
-            prev.setConfirmOffset(realConfirmOffset);
+        long realTimeoutMillis = Optional.ofNullable(timeoutMillis).orElse(DEFAULT_BROKER_CHANNEL_EXPIRED_TIME);
+        int realElectionPriority = Optional.ofNullable(electionPriority).orElse(Integer.MAX_VALUE);
+        if (null == prev) {
+            this.brokerLiveTable.put(addrInfo,
+                new BrokerLiveInfo(brokerName,
+                    brokerAddr,
+                    realBrokerId,
+                    System.currentTimeMillis(),
+                    realTimeoutMillis,
+                    channel,
+                    realEpoch,
+                    realMaxOffset,
+                    realElectionPriority));
+            log.info("new broker registered, {}, brokerId:{}", addrInfo, realBrokerId);
+        } else {
+            prev.setLastUpdateTimestamp(System.currentTimeMillis());
+            prev.setHeartbeatTimeoutMillis(realTimeoutMillis);
+            prev.setElectionPriority(realElectionPriority);
+            prev.setBrokerId(realBrokerId);
+            if (realEpoch > prev.getEpoch() || realEpoch == prev.getEpoch() && realMaxOffset > prev.getMaxOffset()) {
+                prev.setEpoch(realEpoch);
+                prev.setMaxOffset(realMaxOffset);
+                prev.setConfirmOffset(realConfirmOffset);
+            }
         }
+
     }
 
     @Override
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 4cbc1140e..cdc4abee0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
@@ -95,9 +94,6 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                     final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
 
                     if (null != responseHeader) {
-                        if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
-                            heartbeatManager.changeBrokerMetadata(electMasterRequest.getClusterName(), responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
-                        }
                         if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
                             this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
                         }
@@ -113,9 +109,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                     final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
                     final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
                     if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                        this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                                                             responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                                                             controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getElectionPriority());
+                        this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+                            controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
                     }
                     return response;
                 }
@@ -134,8 +130,8 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
             }
             case BROKER_HEARTBEAT: {
                 final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr(),
-                        requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset());
+                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
                 return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
             }
             case CONTROLLER_GET_SYNC_STATE_DATA: {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 41bcaa0bb..6cef5978c 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -128,9 +128,7 @@ public class ControllerManagerTest {
         final String brokerName, final String address, final RemotingClient client,
         final long heartbeatTimeoutMillis) throws Exception {
 
-        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address);
-        // Timeout = 3000
-        requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
+        final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, heartbeatTimeoutMillis, 1, 1000L, 0);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
         final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
         assert response != null;
@@ -173,8 +171,8 @@ public class ControllerManagerTest {
             } catch (Exception e) {
                 e.printStackTrace();
             }
-        }, 0, 2000L, TimeUnit.MILLISECONDS);
-        Boolean flag = await().atMost(Duration.ofSeconds(5)).until(() -> {
+        }, 0, 1000L, TimeUnit.MILLISECONDS);
+        Boolean flag = await().atMost(Duration.ofSeconds(10)).until(() -> {
             final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
             final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
             final RemotingCommand response = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index dd0c60a65..306acf5b6 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -43,7 +43,8 @@ public class DefaultBrokerHeartbeatManagerTest {
         this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
             latch.countDown();
         });
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null, 1, 1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:7000", 1L,3000L, null,
+            1, 1L,-1L, 0);
         assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
         this.heartbeatManager.shutdown();
     }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 2158c3f06..d3b03dfe9 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -150,39 +150,39 @@ public class ReplicasInfoManagerTest {
     }
 
     public void mockHeartbeatDataMasterStillAlive() {
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
-            1, 3L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            1, 3L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, 10000000000L, null,
+            1, 1L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+            1, 2L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+            1, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherEpoch() {
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
-            1, 3L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            0, 3L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+            1, 3L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+            1, 2L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+            0, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherOffset() {
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
-            1, 3L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 2L, 0);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            1, 3L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+            1, 3L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+            1, 2L, -1L, 0);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+            1, 3L, -1L, 0);
     }
 
     public void mockHeartbeatDataHigherPriority() {
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
-            1, 3L, 3);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
-            1, 3L, 2);
-        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
-            1, 3L, 1);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9000", 1L, -10000L, null,
+            1, 3L, -1L, 3);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9001", 1L, 10000000000L, null,
+            1, 3L, -1L, 2);
+        this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:9002", 1L, 10000000000L, null,
+            1, 3L, -1L, 1);
     }
 
     @Test
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
index a8e745e4f..bdcf59c55 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/RegisterBrokerToControllerRequestHeader.java
@@ -29,6 +29,8 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
     @CFNullable
     private Long maxOffset;
     @CFNullable
+    private Long confirmOffset;
+    @CFNullable
     private Long heartbeatTimeoutMillis;
     @CFNullable
     private Integer electionPriority;
@@ -40,14 +42,17 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
         this(clusterName, brokerName, brokerAddress, 0);
     }
 
-    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int electionPriority) {
-        this(clusterName, brokerName, brokerAddress, 0, 0, electionPriority);
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress,
+        int electionPriority) {
+        this(clusterName, brokerName, brokerAddress, null, 0, 0, electionPriority);
     }
 
-    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress, int epoch, long maxOffset, int electionPriority) {
+    public RegisterBrokerToControllerRequestHeader(String clusterName, String brokerName, String brokerAddress,
+        Long heartbeatTimeoutMillis, int epoch, long maxOffset, int electionPriority) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.brokerAddress = brokerAddress;
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
         this.epoch = epoch;
         this.maxOffset = maxOffset;
         this.electionPriority = electionPriority;
@@ -96,14 +101,15 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
     @Override
     public String toString() {
         return "RegisterBrokerToControllerRequestHeader{" +
-                "clusterName='" + clusterName + '\'' +
-                ", brokerName='" + brokerName + '\'' +
-                ", brokerAddress='" + brokerAddress + '\'' +
-                ", epoch=" + epoch +
-                ", maxOffset=" + maxOffset +
-                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
-                ", electionPriority=" + electionPriority +
-                '}';
+            "clusterName='" + clusterName + '\'' +
+            ", brokerName='" + brokerName + '\'' +
+            ", brokerAddress='" + brokerAddress + '\'' +
+            ", epoch=" + epoch +
+            ", maxOffset=" + maxOffset +
+            ", confirmOffset=" + confirmOffset +
+            ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+            ", electionPriority=" + electionPriority +
+            '}';
     }
 
     public Integer getEpoch() {
@@ -122,6 +128,14 @@ public class RegisterBrokerToControllerRequestHeader implements CommandCustomHea
         this.maxOffset = maxOffset;
     }
 
+    public Long getConfirmOffset() {
+        return confirmOffset;
+    }
+
+    public void setConfirmOffset(Long confirmOffset) {
+        this.confirmOffset = confirmOffset;
+    }
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
index 6b15b9be5..eb7332fdf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/namesrv/BrokerHeartbeatRequestHeader.java
@@ -30,11 +30,17 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String brokerName;
     @CFNullable
+    private Long brokerId;
+    @CFNullable
     private Integer epoch;
     @CFNullable
     private Long maxOffset;
     @CFNullable
     private Long confirmOffset;
+    @CFNullable
+    private Long heartbeatTimeoutMills;
+    @CFNullable
+    private Integer electionPriority;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -88,4 +94,28 @@ public class BrokerHeartbeatRequestHeader implements CommandCustomHeader {
     public void setConfirmOffset(Long confirmOffset) {
         this.confirmOffset = confirmOffset;
     }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    public Long getHeartbeatTimeoutMills() {
+        return heartbeatTimeoutMills;
+    }
+
+    public void setHeartbeatTimeoutMills(Long heartbeatTimeoutMills) {
+        this.heartbeatTimeoutMills = heartbeatTimeoutMills;
+    }
+
+    public Integer getElectionPriority() {
+        return electionPriority;
+    }
+
+    public void setElectionPriority(Integer electionPriority) {
+        this.electionPriority = electionPriority;
+    }
 }