You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/01 07:54:16 UTC

[rocketmq] branch dledger-controller-brokerId updated: [ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/dledger-controller-brokerId by this push:
     new c5474b33a [ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)
c5474b33a is described below

commit c5474b33a5232183bf60c6ae42c6bc7efe6f14ca
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Wed Feb 1 15:54:06 2023 +0800

    [ISSUE#5045] Refactor the register and elect-master process in controller mode (#5046)
    
    * refactor(controller): refactor the register logic
    
    1. refactor the register logic
    
    * refactor(controller): remove unused field in ElectMasterEvent
    
    1. remove unused field in ElectMasterEvent
    
    * feat(controller): add a tryElectMaster request and process logic about it
    
    1. add a tryElectMaster request and process logic about it
    
    * feat(controller): refactor ReplicasInfoManagerTest
    
    1. refactor ReplicasInfoManagerTest
    
    * refactor(controller): refactor DLedgerControllerTest
    
    1. refactor DLedgerControllerTest
    
    * refactor(controller): refactor ControllerManagerTest
    
    1. refactor ControllerManagerTest
    
    * refactor(controller): refactor ReplicasInfoManagerTest
    
    1. refactor ReplicasInfoManagerTest
    
    * refactor(controller): refactor register process and pass the junit test
    
    1. refactor ReplicasInfoManagerTest
    
    * style(broker): rename a constant
    
    1. rename a constant
    
    * feat(controller): update the DLedger dependency from v0.27 to v0.30
    
    1. update the DLedger dependency from v0.27 to v0.30
    
    * style(controller): add a white-line just for trigger GitHub action again
    
    1. add a white-line just for trigger GitHub action again
    
    * feat(controller): combine electMaster api and brokerTryElectMaster api
    
    1. combine electMaster api and brokerTryElectMaster api
    
    * feat(controller): add a logic about verifying the broker id returned from registering
    
    1. add a logic about verifying the broker id returned from registering
    
    * fix(controller): remove unused code and add a warning log in ControllerManager
    
    1. remove unused code and add a warning log in ControllerManager
    
    * fix: resolve conflicts
    
    1. resolve conflicts
    
    * fix(controller): remove unused class
    
    1. remove unused class
    
    * fix(controller): Resolve conflicts after merging
    
    1. Resolve conflicts after merging
    
    * refactor(controller): Refactor ReplicasInfoManager#elect
    
    1. Refactor ReplicasInfoManager#elect
    
    * style(controller): remove unused imports
    
    1. remove unused imports
    
    * style(controller): remove unused imports
    
    1. remove unused imports
    
    * fix(controller): resolve conflicts after merging develop branch
    
    1. resolve conflicts after merging develop branch
    
    * rerun
    
    * fix(controller): resolve conflicts in ReplicasInfoManagerTest#testRegisterNewBroker after merging develop branch
    
    1. resolve conflicts in ReplicasInfoManagerTest#testRegisterNewBroker after merging develop branch
    
    * style(controller): pass style check
    
    1. pass style check
---
 .../broker/controller/ReplicasManager.java         |  50 ++++-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  32 ++-
 .../broker/controller/ReplicasManagerTest.java     |   6 +
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   2 +-
 .../rocketmq/controller/ControllerManager.java     |  36 ++--
 .../rocketmq/controller/elect/ElectPolicy.java     |   4 +-
 .../controller/impl/DLedgerController.java         |   2 +-
 .../impl/DLedgerControllerStateMachine.java        |   6 +-
 .../controller/impl/event/ApplyBrokerIdEvent.java  |  11 +-
 .../controller/impl/event/ElectMasterEvent.java    |  15 +-
 .../{BrokerInfo.java => BrokerReplicaInfo.java}    |  14 +-
 .../impl/manager/ReplicasInfoManager.java          | 240 ++++++++++-----------
 .../controller/impl/manager/SyncStateInfo.java     |  19 +-
 .../processor/ControllerRequestProcessor.java      | 194 ++++++++++-------
 .../impl/controller/ControllerManagerTest.java     |  38 +++-
 .../controller/impl/DLedgerControllerTest.java     |  69 +++---
 .../impl/manager/ReplicasInfoManagerTest.java      | 186 ++++++++++++----
 .../rocketmq/remoting/protocol/ResponseCode.java   |   9 +-
 .../protocol/body/RoleChangeNotifyEntry.java       |  60 ++++++
 .../controller/ElectMasterRequestHeader.java       |  61 +++++-
 .../controller/ElectMasterResponseHeader.java      |  23 +-
 .../controller/ReElectMasterSubCommand.java        |   2 +-
 22 files changed, 734 insertions(+), 345 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index a0218f8cc..02e24a859 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.EpochEntry;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
@@ -103,6 +104,9 @@ public class ReplicasManager {
     enum State {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
+
+        FIRST_TIME_WAIT_MASTER_IS_ELECTED,
+
         RUNNING,
         SHUTDOWN,
     }
@@ -141,6 +145,15 @@ public class ReplicasManager {
         if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
             if (registerBrokerToController()) {
                 LOGGER.info("First time register broker success");
+                this.state = State.FIRST_TIME_WAIT_MASTER_IS_ELECTED;
+            } else {
+                return false;
+            }
+        }
+
+        if (this.state == State.FIRST_TIME_WAIT_MASTER_IS_ELECTED) {
+            if (StringUtils.isNotEmpty(this.masterAddress) || brokerElect()) {
+                LOGGER.info("Master in this broker set is elected");
                 this.state = State.RUNNING;
             } else {
                 return false;
@@ -287,6 +300,30 @@ public class ReplicasManager {
         }
     }
 
+    private boolean brokerElect() {
+        // Broker try to elect itself as a master in broker set.
+        try {
+            ElectMasterResponseHeader tryElectResponse = this.brokerOuterAPI.brokerElect(this.controllerLeaderAddress, this.brokerConfig.getBrokerClusterName(),
+                this.brokerConfig.getBrokerName(), this.localAddress);
+            final String masterAddress = tryElectResponse.getMasterAddress();
+            if (StringUtils.isEmpty(masterAddress)) {
+                LOGGER.warn("Now no master in broker set");
+                return false;
+            }
+
+            if (StringUtils.equals(masterAddress, this.localAddress)) {
+                changeToMaster(tryElectResponse.getMasterEpoch(), tryElectResponse.getSyncStateSetEpoch());
+            } else {
+                changeToSlave(masterAddress, tryElectResponse.getMasterEpoch(), tryElectResponse.getBrokerId());
+            }
+            brokerController.setIsolated(false);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to try elect", e);
+            return false;
+        }
+    }
+
     private boolean registerBrokerToController() {
         // Register this broker to controller, get brokerId and masterAddress.
         try {
@@ -303,8 +340,13 @@ public class ReplicasManager {
                 // Set isolated to false, make broker can register to namesrv regularly
                 brokerController.setIsolated(false);
             } else {
-                LOGGER.warn("No master in controller");
-                return false;
+                // if master address is empty, just apply the brokerId
+                if (registerResponse.getBrokerId() <= 0) {
+                    // wrong broker id
+                    LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId());
+                    return false;
+                }
+                this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
             }
             return true;
         } catch (final Exception e) {
@@ -340,8 +382,8 @@ public class ReplicasManager {
                                 }
                             }
                         } else {
-                            // In this case, the master in controller is null, try register to controller again, this will trigger the electMasterEvent in controller.
-                            registerBrokerToController();
+                            // In this case, the master in controller is null, try elect in controller, this will trigger the electMasterEvent in controller.
+                            brokerElect();
                         }
                     } else if (newMasterEpoch == this.masterEpoch) {
                         // Check if sync state set changed
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 10cd27342..83f798474 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -116,6 +116,8 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResp
 import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -129,6 +131,9 @@ import org.apache.rocketmq.store.timer.TimerMetrics;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
 
 public class BrokerOuterAPI {
@@ -203,7 +208,7 @@ public class BrokerOuterAPI {
 
     public void updateNameServerAddressList(final String addrs) {
         String[] addrArray = addrs.split(";");
-        List<String> lst = new ArrayList<>(Arrays.asList(addrArray));
+        List<String> lst = new ArrayList<String>(Arrays.asList(addrArray));
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
@@ -1156,6 +1161,31 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    /**
+     * Broker try to elect itself as a master in broker set
+     */
+    public ElectMasterResponseHeader brokerElect(String controllerAddress, String clusterName, String brokerName,
+                                                 String brokerAddress) throws Exception {
+
+        final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
+        RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case CONTROLLER_NOT_LEADER: {
+                throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+            }
+            case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
+                throw new MQBrokerException(response.getCode(), response.getRemark());
+            case CONTROLLER_ELECT_MASTER_FAILED:
+            case CONTROLLER_MASTER_STILL_EXIST:
+            case SUCCESS:
+                return (ElectMasterResponseHeader) response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+
     /**
      * Register broker to controller
      */
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index 01eacf43b..1414778d4 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
@@ -70,6 +71,8 @@ public class ReplicasManagerTest {
 
     private RegisterBrokerToControllerResponseHeader registerBrokerToControllerResponseHeader;
 
+    private ElectMasterResponseHeader brokerTryElectResponseHeader;
+
     private Pair<GetReplicaInfoResponseHeader, SyncStateSet> result;
 
     private GetReplicaInfoResponseHeader getReplicaInfoResponseHeader;
@@ -108,6 +111,8 @@ public class ReplicasManagerTest {
         getMetaDataResponseHeader = new GetMetaDataResponseHeader(GROUP, LEADER_ID, OLD_MASTER_ADDRESS, IS_LEADER, PEERS);
         registerBrokerToControllerResponseHeader = new RegisterBrokerToControllerResponseHeader();
         registerBrokerToControllerResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
+        brokerTryElectResponseHeader = new ElectMasterResponseHeader();
+        brokerTryElectResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
         getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader();
         getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
         getReplicaInfoResponseHeader.setBrokerId(MASTER_BROKER_ID);
@@ -125,6 +130,7 @@ public class ReplicasManagerTest {
         when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
         when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), any(), anyLong(), anyInt(), anyLong(), anyInt())).thenReturn(registerBrokerToControllerResponseHeader);
         when(brokerOuterAPI.getReplicaInfo(any(), any(), any())).thenReturn(result);
+        when(brokerOuterAPI.brokerElect(any(), any(), any(), any())).thenReturn(brokerTryElectResponseHeader);
         replicasManager = new ReplicasManager(brokerController);
         autoSwitchHAService.init(defaultMessageStore);
         replicasManager.start();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f251e2b00..2748acbd9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -3083,7 +3083,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
         assert controllerMetaData != null;
         assert controllerMetaData.getControllerLeaderAddress() != null;
         final String leaderAddress = controllerMetaData.getControllerLeaderAddress();
-        ElectMasterRequestHeader electRequestHeader = new ElectMasterRequestHeader(clusterName, brokerName, brokerAddr);
+        ElectMasterRequestHeader electRequestHeader = ElectMasterRequestHeader.ofAdminTrigger(clusterName, brokerName, brokerAddr);
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, electRequestHeader);
         final RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index a2c570817..662403192 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -24,12 +24,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
+
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
@@ -45,6 +47,7 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
 import org.apache.rocketmq.remoting.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -87,7 +90,7 @@ public class ControllerManager {
             new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
             @Override
             protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
-                return new FutureTaskExt<>(runnable, value);
+                return new FutureTaskExt<T>(runnable, value);
             }
         };
         this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
@@ -127,13 +130,14 @@ public class ControllerManager {
                     log.warn("The {} broker with IP address {} shutdown", brokerName, brokerAddress);
                     return;
                 }
-                final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+
+                final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
                 final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
                 final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) electMasterResponse.readCustomHeader();
                 if (responseHeader != null) {
                     log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
                     if (controllerConfig.isNotifyBrokerRoleChanged()) {
-                        notifyBrokerRoleChanged(responseHeader, clusterName);
+                        notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
                     }
                 }
             } catch (Exception e) {
@@ -147,20 +151,24 @@ public class ControllerManager {
     /**
      * Notify master and all slaves for a broker that the master role changed.
      */
-    public void notifyBrokerRoleChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
-        final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+    public void notifyBrokerRoleChanged(final RoleChangeNotifyEntry entry) {
+        final BrokerMemberGroup memberGroup = entry.getBrokerMemberGroup();
         if (memberGroup != null) {
+            final String master = entry.getMasterAddress();
+            if (StringUtils.isEmpty(master)) {
+                log.warn("Notify broker role change failed, because member group is not null but the new master address is empty, entry:{}", entry);
+                return;
+            }
             // First, inform the master
-            final String master = electMasterResult.getNewMasterAddress();
-            if (StringUtils.isNoneEmpty(master) && this.heartbeatManager.isBrokerActive(clusterName, master)) {
-                doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+            if (this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), master)) {
+                doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, entry);
             }
 
             // Then, inform all slaves
             final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
             for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
-                if (!broker.getValue().equals(master) && this.heartbeatManager.isBrokerActive(clusterName, broker.getValue())) {
-                    doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+                if (!master.equals(broker.getValue()) && this.heartbeatManager.isBrokerActive(memberGroup.getCluster(), broker.getValue())) {
+                    doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), entry);
                 }
             }
 
@@ -168,11 +176,11 @@ public class ControllerManager {
     }
 
     public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
-        final ElectMasterResponseHeader responseHeader) {
+                                          final RoleChangeNotifyEntry entry) {
         if (StringUtils.isNoneEmpty(brokerAddr)) {
-            log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
-            final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
-                responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+            log.info("Try notify broker {} with id {} that role changed, RoleChangeNotifyEntry:{}", brokerAddr, brokerId, entry);
+            final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(entry.getMasterAddress(),
+                    entry.getMasterEpoch(), entry.getSyncStateSetEpoch(), brokerId);
             final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
             try {
                 this.remotingClient.invokeOneway(brokerAddr, request, 3000);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
index 214012e51..aba8f5538 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/elect/ElectPolicy.java
@@ -28,9 +28,9 @@ public interface ElectPolicy {
      * @param syncStateBrokers  all broker replicas in syncStateSet
      * @param allReplicaBrokers all broker replicas
      * @param oldMaster         old master
-     * @param preferBrokerAddr  the broker prefer to be elected
+     * @param brokerAddr  broker address(can be used as prefer or assigned in some elect policy)
      * @return new master's brokerAddr
      */
-    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String preferBrokerAddr);
+    String elect(String clusterName, Set<String> syncStateBrokers, Set<String> allReplicaBrokers, String oldMaster, String brokerAddr);
 
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index f9ea41174..f4f32e05c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -107,7 +107,7 @@ public class DLedgerController implements Controller {
 
         this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
         this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
-        this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+        this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getGroup(), dLedgerConfig.getSelfId());
 
         // Register statemachine and role handler.
         this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
index dde94e998..39841d4e6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java
@@ -39,10 +39,10 @@ public class DLedgerControllerStateMachine implements StateMachine {
     private final String dLedgerId;
 
     public DLedgerControllerStateMachine(final ReplicasInfoManager replicasInfoManager,
-        final EventSerializer eventSerializer, final String dLedgerId) {
+        final EventSerializer eventSerializer, final String dLedgerGroupId, final String dLedgerSelfId) {
         this.replicasInfoManager = replicasInfoManager;
         this.eventSerializer = eventSerializer;
-        this.dLedgerId = dLedgerId;
+        this.dLedgerId = generateDLedgerId(dLedgerGroupId, dLedgerSelfId);
     }
 
     @Override
@@ -76,6 +76,6 @@ public class DLedgerControllerStateMachine implements StateMachine {
 
     @Override
     public String getBindDLedgerId() {
-        return dLedgerId;
+        return this.dLedgerId;
     }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
index 8cec2ec9a..c4934d7c0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
@@ -21,11 +21,13 @@ package org.apache.rocketmq.controller.impl.event;
  * Triggered by the RegisterBrokerApi.
  */
 public class ApplyBrokerIdEvent implements EventMessage {
+    private final String clusterName;
     private final String brokerName;
     private final String brokerAddress;
     private final long newBrokerId;
 
-    public ApplyBrokerIdEvent(String brokerName, String brokerAddress, long newBrokerId) {
+    public ApplyBrokerIdEvent(String clusterName, String brokerName, String brokerAddress, long newBrokerId) {
+        this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.brokerAddress = brokerAddress;
         this.newBrokerId = newBrokerId;
@@ -48,10 +50,15 @@ public class ApplyBrokerIdEvent implements EventMessage {
         return newBrokerId;
     }
 
+    public String getClusterName() {
+        return clusterName;
+    }
+
     @Override
     public String toString() {
         return "ApplyBrokerIdEvent{" +
-            "brokerName='" + brokerName + '\'' +
+            "clusterName='" + clusterName + '\'' +
+            ", brokerName='" + brokerName + '\'' +
             ", brokerAddress='" + brokerAddress + '\'' +
             ", newBrokerId=" + newBrokerId +
             '}';
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
index eb3b7984e..970b5d8cd 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
@@ -25,21 +25,19 @@ public class ElectMasterEvent implements EventMessage {
     private final boolean newMasterElected;
     private final String brokerName;
     private final String newMasterAddress;
-    private final String clusterName;
 
     public ElectMasterEvent(boolean newMasterElected, String brokerName) {
-        this(newMasterElected, brokerName, "", "");
+        this(newMasterElected, brokerName, "");
     }
 
     public ElectMasterEvent(String brokerName, String newMasterAddress) {
-        this(true, brokerName, newMasterAddress, "");
+        this(true, brokerName, newMasterAddress);
     }
 
-    public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress, String clusterName) {
+    public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress) {
         this.newMasterElected = newMasterElected;
         this.brokerName = brokerName;
         this.newMasterAddress = newMasterAddress;
-        this.clusterName = clusterName;
     }
 
     @Override
@@ -59,17 +57,12 @@ public class ElectMasterEvent implements EventMessage {
         return newMasterAddress;
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
-
     @Override
     public String toString() {
         return "ElectMasterEvent{" +
-            "isNewMasterElected=" + newMasterElected +
+            "newMasterElected=" + newMasterElected +
             ", brokerName='" + brokerName + '\'' +
             ", newMasterAddress='" + newMasterAddress + '\'' +
-            ", clusterName='" + clusterName + '\'' +
             '}';
     }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
similarity index 84%
rename from controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index 0d56285fb..e2a68a544 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -20,19 +20,23 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
 
-public class BrokerInfo {
+/**
+ * Broker replicas info, mapping from brokerAddress to {brokerId, brokerHaAddress}.
+ */
+public class BrokerReplicaInfo {
     private final String clusterName;
     private final String brokerName;
     // Start from 1
-    private final AtomicLong brokerIdCount;
+    private final AtomicLong nextAssignBrokerId;
     private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
 
-    public BrokerInfo(String clusterName, String brokerName) {
+    public BrokerReplicaInfo(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.brokerIdCount = new AtomicLong(1L);
         this.brokerIdTable = new HashMap<>();
+        this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_SLAVE_ID);
     }
 
     public void removeBrokerAddress(final String address) {
@@ -40,7 +44,7 @@ public class BrokerInfo {
     }
 
     public long newBrokerId() {
-        return this.brokerIdCount.incrementAndGet();
+        return this.nextAssignBrokerId.getAndIncrement();
     }
 
     public String getClusterName() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index a820b069e..7b57bff9b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -61,7 +61,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToC
 public class ReplicasInfoManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
     private final ControllerConfig controllerConfig;
-    private final Map<String/* brokerName */, BrokerInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, BrokerReplicaInfo> replicaInfoTable;
     private final Map<String/* brokerName */, SyncStateInfo> syncStateSetInfoTable;
 
     public ReplicasInfoManager(final ControllerConfig config) {
@@ -80,7 +80,7 @@ public class ReplicasInfoManager {
         if (isContainsBroker(brokerName)) {
             final Set<String> newSyncStateSet = syncStateSet.getSyncStateSet();
             final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
 
             // Check whether the oldSyncStateSet is equal with newSyncStateSet
             final Set<String> oldSyncStateSet = syncStateInfo.getSyncStateSet();
@@ -120,13 +120,13 @@ public class ReplicasInfoManager {
 
             // Check newSyncStateSet correctness
             for (String replicas : newSyncStateSet) {
-                if (!brokerInfo.isBrokerExist(replicas)) {
+                if (!brokerReplicaInfo.isBrokerExist(replicas)) {
                     String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replicas);
                     LOGGER.error("{}", err);
                     result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REPLICAS, err);
                     return result;
                 }
-                if (!brokerAlivePredicate.test(brokerInfo.getClusterName(), replicas)) {
+                if (!brokerAlivePredicate.test(brokerReplicaInfo.getClusterName(), replicas)) {
                     String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replicas);
                     LOGGER.error(err);
                     result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE, err);
@@ -156,57 +156,85 @@ public class ReplicasInfoManager {
     public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request,
         final ElectPolicy electPolicy) {
         final String brokerName = request.getBrokerName();
-        final String assignBrokerAddress = request.getBrokerAddress();
+        final String brokerAddress = request.getBrokerAddress();
         final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
-        if (isContainsBroker(brokerName)) {
-            final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
-            final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
-            final String oldMaster = syncStateInfo.getMasterAddress();
-            Set<String> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerInfo.getAllBroker() : null;
-
-            // elect by policy
-            String newMaster = electPolicy.elect(brokerInfo.getClusterName(), syncStateSet, allReplicaBrokers, oldMaster, assignBrokerAddress);
-            if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) {
-                // old master still valid, change nothing
-                String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerInfo.getBrokerName());
-                LOGGER.warn("{}", err);
-                result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, err);
-                return result;
-            }
-            // a new master is elected
-            if (StringUtils.isNotEmpty(newMaster)) {
-                final int masterEpoch = this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
-                final int syncStateSetEpoch = this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
-                final ElectMasterResponseHeader response = result.getResponse();
-                response.setNewMasterAddress(newMaster);
-                response.setMasterEpoch(masterEpoch + 1);
-                response.setSyncStateSetEpoch(syncStateSetEpoch);
-                BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
-                if (null != brokerMemberGroup) {
-                    response.setBrokerMemberGroup(brokerMemberGroup);
-                    result.setBody(brokerMemberGroup.encode());
-                }
-                final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
-                result.addEvent(event);
-                return result;
+        final ElectMasterResponseHeader response = result.getResponse();
+        if (!isContainsBroker(brokerName)) {
+            // this broker set hasn't been registered
+            response.setMasterAddress("");
+            result.setCodeAndRemark(ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED, "Broker hasn't been registered");
+            return result;
+        }
+
+        final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
+        final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+        final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
+        final String oldMaster = syncStateInfo.getMasterAddress();
+        Set<String> allReplicaBrokers = controllerConfig.isEnableElectUncleanMaster() ? brokerReplicaInfo.getAllBroker() : null;
+        String newMaster = null;
+
+        if (syncStateInfo.isFirstTimeForElect()) {
+            // If never have a master in this broker set, in other words, it is the first time to elect a master
+            // elect it as the first master
+            newMaster = brokerAddress;
+        }
+
+        // elect by policy
+        if (newMaster == null) {
+            // we should assign this assignedBrokerAddr when the brokerAddress need to be elected by force
+            String assignedBrokerAddr = request.isForceElect() ? brokerAddress : null;
+            newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerAddr);
+        }
+
+        if (StringUtils.isNotEmpty(newMaster) && newMaster.equals(oldMaster)) {
+            // old master still valid, change nothing
+            String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName());
+            LOGGER.warn("{}", err);
+            // the master still exist
+            response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+            response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
+            response.setMasterAddress(syncStateInfo.getMasterAddress());
+            response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
+            result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
+            return result;
+        }
+
+        // a new master is elected
+        if (StringUtils.isNotEmpty(newMaster)) {
+            final int masterEpoch = syncStateInfo.getMasterEpoch();
+            final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
+            response.setMasterAddress(newMaster);
+            response.setMasterEpoch(masterEpoch + 1);
+            response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
+            response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
+            BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
+            if (null != brokerMemberGroup) {
+                response.setBrokerMemberGroup(brokerMemberGroup);
+                result.setBody(brokerMemberGroup.encode());
             }
-            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
-            // that the master was shutdown and no new master was elected.
-            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
             result.addEvent(event);
-            result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Failed to elect a new broker master");
             return result;
         }
-        result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Broker metadata is not existed");
+        // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress),
+        // we still need to apply an ElectMasterEvent to tell the statemachine
+        // that the master was shutdown and no new master was elected.
+        if (request.getBrokerAddress() == null) {
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master");
+        } else {
+            result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master");
+        }
+        response.setMasterAddress("");
         return result;
     }
 
     private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
         if (isContainsBroker(brokerName)) {
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
-            final BrokerMemberGroup group = new BrokerMemberGroup(brokerInfo.getClusterName(), brokerName);
-            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+            final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
             final HashMap<Long, String> memberGroup = new HashMap<>();
             brokerIdTable.forEach((addr, id) -> memberGroup.put(id, addr));
             group.setBrokerAddrs(memberGroup);
@@ -222,80 +250,40 @@ public class ReplicasInfoManager {
         final String clusterName = request.getClusterName();
         final ControllerResult<RegisterBrokerToControllerResponseHeader> result = new ControllerResult<>(new RegisterBrokerToControllerResponseHeader());
         final RegisterBrokerToControllerResponseHeader response = result.getResponse();
-        boolean canBeElectedAsMaster;
-
+        // If the broker's metadata does not exist in the state machine, we can assign the broker a brokerId valued 1
+        // By default, we set this variable to a value of 1
+        long brokerId = MixAll.FIRST_SLAVE_ID;
+        boolean shouldApplyBrokerId = true;
         if (isContainsBroker(brokerName)) {
             final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
 
-            // Get brokerId.
-            long brokerId;
-            if (!brokerInfo.isBrokerExist(brokerAddress)) {
-                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
-                brokerId = brokerInfo.newBrokerId();
-                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(brokerName, brokerAddress, brokerId);
-                result.addEvent(applyIdEvent);
+            if (brokerReplicaInfo.isBrokerExist(brokerAddress)) {
+                // this broker have registered
+                brokerId = brokerReplicaInfo.getBrokerId(brokerAddress);
+                shouldApplyBrokerId = false;
             } else {
-                brokerId = brokerInfo.getBrokerId(brokerAddress);
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerReplicaInfo.newBrokerId();
             }
-            response.setBrokerId(brokerId);
-            response.setMasterEpoch(syncStateInfo.getMasterEpoch());
-            response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
 
             if (syncStateInfo.isMasterExist() && brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
                 // If the master is alive, just return master info.
                 final String masterAddress = syncStateInfo.getMasterAddress();
                 response.setMasterAddress(masterAddress);
-                return result;
-            } else if (syncStateInfo.isMasterExist() && !brokerAlivePredicate.test(clusterName, syncStateInfo.getMasterAddress())) {
-                // filter alive slave broker
-                Set<String> aliveSlaveBrokerAddressSet = syncStateInfo.getSyncStateSet().stream()
-                    .filter(brokerAddr -> brokerAlivePredicate.test(clusterName, brokerAddr) && !StringUtils.equals(brokerAddr, syncStateInfo.getMasterAddress()))
-                    .collect(Collectors.toSet());
-                if (null != aliveSlaveBrokerAddressSet && aliveSlaveBrokerAddressSet.size() > 0) {
-                    if (!aliveSlaveBrokerAddressSet.contains(brokerAddress)) {
-                        brokerAddress = aliveSlaveBrokerAddressSet.iterator().next();
-                    }
-                    canBeElectedAsMaster = true;
-                } else {
-                    // If the master is not alive and all slave is not alive, we should elect a new master:
-                    // Case1: This replicas was in sync state set list
-                    // Case2: The option {EnableElectUncleanMaster} is true
-                    canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
-                }
-                if (!canBeElectedAsMaster) {
-                     // still need to apply an ElectMasterEvent to tell the statemachine
-                    // that the master was shutdown and no new master was elected. set SyncStateInfo.masterAddress empty
-                    final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
-                    result.addEvent(event);
-                }
-            } else {
-                // If the master is not alive, we should elect a new master:
-                // Case1: This replicas was in sync state set list
-                // Case2: The option {EnableElectUncleanMaster} is true
-                canBeElectedAsMaster = syncStateInfo.getSyncStateSet().contains(brokerAddress) || this.controllerConfig.isEnableElectUncleanMaster();
+                response.setMasterEpoch(syncStateInfo.getMasterEpoch());
+                response.setSyncStateSetEpoch(syncStateInfo.getSyncStateSetEpoch());
             }
-        } else {
-            // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
-            canBeElectedAsMaster = true;
         }
 
-        if (canBeElectedAsMaster) {
-            final boolean isBrokerExist = isContainsBroker(brokerName);
-            int masterEpoch = isBrokerExist ? this.syncStateSetInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
-            int syncStateSetEpoch = isBrokerExist ? this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch() + 1 : 1;
-            response.setMasterAddress(brokerAddress);
-            response.setMasterEpoch(masterEpoch);
-            response.setSyncStateSetEpoch(syncStateSetEpoch);
-            response.setBrokerId(MixAll.MASTER_ID);
-
-            final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, clusterName);
-            result.addEvent(event);
-            return result;
+        response.setBrokerId(brokerId);
+        if (response.getMasterAddress() == null) {
+            response.setMasterAddress("");
+        }
+        if (shouldApplyBrokerId) {
+            final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getClusterName(), brokerName, brokerAddress, brokerId);
+            result.addEvent(applyIdEvent);
         }
-
-        response.setMasterAddress("");
-        result.setCodeAndRemark(ResponseCode.CONTROLLER_REGISTER_BROKER_FAILED, "The broker has not master, and this new registered broker can't not be elected as master");
         return result;
     }
 
@@ -306,12 +294,12 @@ public class ReplicasInfoManager {
         if (isContainsBroker(brokerName)) {
             // If exist broker metadata, just return metadata
             final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
             final String masterAddress = syncStateInfo.getMasterAddress();
             response.setMasterAddress(masterAddress);
             response.setMasterEpoch(syncStateInfo.getMasterEpoch());
             if (StringUtils.isNotEmpty(request.getBrokerAddress())) {
-                response.setBrokerId(brokerInfo.getBrokerId(request.getBrokerAddress()));
+                response.setBrokerId(brokerReplicaInfo.getBrokerId(request.getBrokerAddress()));
             }
             result.setBody(new SyncStateSet(syncStateInfo.getSyncStateSet(), syncStateInfo.getSyncStateSetEpoch()).encode());
             return result;
@@ -327,12 +315,12 @@ public class ReplicasInfoManager {
             if (isContainsBroker(brokerName)) {
                 // If exist broker metadata, just return metadata
                 final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
-                final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+                final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
                 final Set<String> syncStateSet = syncStateInfo.getSyncStateSet();
                 final String master = syncStateInfo.getMasterAddress();
                 final ArrayList<InSyncStateData.InSyncMember> inSyncMembers = new ArrayList<>();
                 syncStateSet.forEach(replicas -> {
-                    long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas);
+                    long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerReplicaInfo.getBrokerId(replicas);
                     inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId));
                 });
 
@@ -417,10 +405,21 @@ public class ReplicasInfoManager {
     private void handleApplyBrokerId(final ApplyBrokerIdEvent event) {
         final String brokerName = event.getBrokerName();
         if (isContainsBroker(brokerName)) {
-            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
-            if (!brokerInfo.isBrokerExist(event.getBrokerAddress())) {
-                brokerInfo.addBroker(event.getBrokerAddress(), event.getNewBrokerId());
+            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
+            if (!brokerReplicaInfo.isBrokerExist(event.getBrokerAddress())) {
+                brokerReplicaInfo.addBroker(event.getBrokerAddress(), event.getNewBrokerId());
             }
+        } else {
+            // First time to register in this broker set
+            // Initialize the replicaInfo about this broker set
+            final String clusterName = event.getClusterName();
+            final BrokerReplicaInfo brokerReplicaInfo = new BrokerReplicaInfo(clusterName, brokerName);
+            long brokerId = brokerReplicaInfo.newBrokerId();
+            brokerReplicaInfo.addBroker(event.getBrokerAddress(), brokerId);
+            this.replicaInfoTable.put(brokerName, brokerReplicaInfo);
+            final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName);
+            // Initialize an empty syncStateInfo for this broker set
+            this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
         }
     }
 
@@ -443,16 +442,9 @@ public class ReplicasInfoManager {
                 // So we should delete old master, but retain newSyncStateSet list.
                 syncStateInfo.updateMasterInfo("");
             }
-        } else {
-            // When the first replicas of a broker come online,
-            // we can create memory meta information for the broker, and regard it as master
-            final String clusterName = event.getClusterName();
-            final BrokerInfo brokerInfo = new BrokerInfo(clusterName, brokerName);
-            brokerInfo.addBroker(newMaster, 1L);
-            final SyncStateInfo syncStateInfo = new SyncStateInfo(clusterName, brokerName, newMaster);
-            this.syncStateSetInfoTable.put(brokerName, syncStateInfo);
-            this.replicaInfoTable.put(brokerName, brokerInfo);
+            return;
         }
+        LOGGER.error("Receive an ElectMasterEvent which contains the un-registered broker, event = {}", event);
     }
 
     private void handleCleanBrokerDataEvent(final CleanBrokerDataEvent event) {
@@ -468,13 +460,13 @@ public class ReplicasInfoManager {
         if (!isContainsBroker(brokerName)) {
             return;
         }
-        final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+        final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
         final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
         for (String brokerAddress : brokerAddressSet) {
-            brokerInfo.removeBrokerAddress(brokerAddress);
+            brokerReplicaInfo.removeBrokerAddress(brokerAddress);
             syncStateInfo.removeSyncState(brokerAddress);
         }
-        if (brokerInfo.getBrokerIdTable().isEmpty()) {
+        if (brokerReplicaInfo.getBrokerIdTable().isEmpty()) {
             this.replicaInfoTable.remove(brokerName);
         }
         if (syncStateInfo.getSyncStateSet().isEmpty()) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 997dee3c5..29570b5ea 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -16,8 +16,10 @@
  */
 package org.apache.rocketmq.controller.impl.manager;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * Manages the syncStateSet of broker replicas.
@@ -25,13 +27,21 @@ import java.util.Set;
 public class SyncStateInfo {
     private final String clusterName;
     private final String brokerName;
-
     private Set<String/*Address*/> syncStateSet;
     private int syncStateSetEpoch;
 
     private String masterAddress;
     private int masterEpoch;
 
+    public SyncStateInfo(String clusterName, String brokerName) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.masterEpoch = 0;
+        this.syncStateSetEpoch = 0;
+        this.syncStateSet = Collections.emptySet();
+    }
+
+
     public SyncStateInfo(String clusterName, String brokerName, String masterAddress) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
@@ -42,6 +52,7 @@ public class SyncStateInfo {
         this.syncStateSetEpoch = 1;
     }
 
+
     public void updateMasterInfo(String masterAddress) {
         this.masterAddress = masterAddress;
         this.masterEpoch++;
@@ -52,8 +63,12 @@ public class SyncStateInfo {
         this.syncStateSetEpoch++;
     }
 
+    public boolean isFirstTimeForElect() {
+        return this.masterEpoch == 0;
+    }
+
     public boolean isMasterExist() {
-        return !this.masterAddress.isEmpty();
+        return StringUtils.isNotEmpty(this.masterAddress);
     }
 
     public String getClusterName() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index cdc4abee0..78ebad8fc 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
@@ -33,6 +34,7 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.RoleChangeNotifyEntry;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
@@ -77,100 +79,125 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                 request);
         }
         switch (request.getCode()) {
-            case CONTROLLER_ALTER_SYNC_STATE_SET: {
-                final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
-                final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
-                if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
+            case CONTROLLER_ALTER_SYNC_STATE_SET:
+                return this.handleAlterSyncStateSet(ctx, request);
+            case CONTROLLER_ELECT_MASTER:
+                return this.handleControllerElectMaster(ctx, request);
+            case CONTROLLER_REGISTER_BROKER:
+                return this.handleControllerRegisterBroker(ctx, request);
+            case CONTROLLER_GET_REPLICA_INFO:
+                return this.handleControllerGetReplicaInfo(ctx, request);
+            case CONTROLLER_GET_METADATA_INFO:
+                return this.handleControllerGetMetadataInfo(ctx, request);
+            case BROKER_HEARTBEAT:
+                return this.handleBrokerHeartbeat(ctx, request);
+            case CONTROLLER_GET_SYNC_STATE_DATA:
+                return this.handleControllerGetSyncStateData(ctx, request);
+            case UPDATE_CONTROLLER_CONFIG:
+                return this.handleUpdateControllerConfig(ctx, request);
+            case GET_CONTROLLER_CONFIG:
+                return this.handleGetControllerConfig(ctx, request);
+            case CLEAN_BROKER_DATA:
+                return this.handleCleanBrokerData(ctx, request);
+            default: {
+                final String error = " request type " + request.getCode() + " not supported";
+                return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             }
-            case CONTROLLER_ELECT_MASTER: {
-                final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
-
-                    if (null != responseHeader) {
-                        if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
-                            this.controllerManager.notifyBrokerRoleChanged(responseHeader, electMasterRequest.getClusterName());
-                        }
-                    }
-                    return response;
+        }
+    }
+
+    private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+        final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerElectMaster(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final ElectMasterRequestHeader electMasterRequest = (ElectMasterRequestHeader) request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().electMaster(electMasterRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+            final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+
+            if (response.getCode() == ResponseCode.SUCCESS && responseHeader != null) {
+                if (this.controllerManager.getControllerConfig().isNotifyBrokerRoleChanged()) {
+                    this.controllerManager.notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(responseHeader));
                 }
-                break;
             }
-            case CONTROLLER_REGISTER_BROKER: {
-                final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
-                if (future != null) {
-                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                    final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
-                    if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
-                        this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
-                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
-                            controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
-                    }
-                    return response;
-                }
-                break;
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+
+    private RemotingCommand handleControllerRegisterBroker(ChannelHandlerContext ctx,
+                                                           RemotingCommand request) throws Exception {
+        final RegisterBrokerToControllerRequestHeader controllerRequest = (RegisterBrokerToControllerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerToControllerRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().registerBroker(controllerRequest);
+        if (future != null) {
+            final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+            final RegisterBrokerToControllerResponseHeader responseHeader = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
+            if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
+                this.heartbeatManager.onBrokerHeartbeat(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+                        responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel(),
+                        controllerRequest.getEpoch(), controllerRequest.getMaxOffset(), controllerRequest.getConfirmOffset(), controllerRequest.getElectionPriority());
             }
-            case CONTROLLER_GET_REPLICA_INFO: {
-                final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+            return response;
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetReplicaInfo(ChannelHandlerContext ctx,
+                                                           RemotingCommand request) throws Exception {
+        final GetReplicaInfoRequestHeader controllerRequest = (GetReplicaInfoRequestHeader) request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getReplicaInfo(controllerRequest);
+        if (future != null) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    private RemotingCommand handleControllerGetMetadataInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+        return this.controllerManager.getController().getControllerMetadata();
+    }
+
+    private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+        this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
+                requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
+        return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
+    }
+
+    private RemotingCommand handleControllerGetSyncStateData(ChannelHandlerContext ctx,
+                                                             RemotingCommand request) throws Exception {
+        if (request.getBody() != null) {
+            final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
+            if (brokerNames != null && brokerNames.size() > 0) {
+                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
                 if (future != null) {
                     return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
                 }
-                break;
-            }
-            case CONTROLLER_GET_METADATA_INFO: {
-                return this.controllerManager.getController().getControllerMetadata();
-            }
-            case BROKER_HEARTBEAT: {
-                final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
-                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
-                    requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
-                return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
-            }
-            case CONTROLLER_GET_SYNC_STATE_DATA: {
-                if (request.getBody() != null) {
-                    final List<String> brokerNames = RemotingSerializable.decode(request.getBody(), List.class);
-                    if (brokerNames != null && brokerNames.size() > 0) {
-                        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().getSyncStateData(brokerNames);
-                        if (future != null) {
-                            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                        }
-                    }
-                }
-                break;
-            }
-            case UPDATE_CONTROLLER_CONFIG:
-                return this.updateControllerConfig(ctx, request);
-            case GET_CONTROLLER_CONFIG:
-                return this.getControllerConfig(ctx, request);
-            case CLEAN_BROKER_DATA:
-                final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
-                final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
-                if (null != future) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
-                }
-                break;
-            default: {
-                final String error = " request type " + request.getCode() + " not supported";
-                return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             }
         }
         return RemotingCommand.createResponseCommand(null);
     }
 
-    @Override
-    public boolean rejectRequest() {
-        return false;
+    private RemotingCommand handleCleanBrokerData(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final CleanControllerBrokerDataRequestHeader requestHeader = (CleanControllerBrokerDataRequestHeader) request.decodeCommandCustomHeader(CleanControllerBrokerDataRequestHeader.class);
+        final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().cleanBrokerData(requestHeader);
+        if (null != future) {
+            return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+        }
+        return RemotingCommand.createResponseCommand(null);
     }
 
-    private RemotingCommand updateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand handleUpdateControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         if (ctx != null) {
             log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
         }
@@ -205,7 +232,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand getControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand handleGetControllerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         String content = this.controllerManager.getConfiguration().getAllConfigsFormatString();
@@ -225,4 +252,9 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
 }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 6cef5978c..869a50e72 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -36,19 +36,24 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
 import static org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class ControllerManagerTest {
@@ -131,7 +136,7 @@ public class ControllerManagerTest {
         final RegisterBrokerToControllerRequestHeader requestHeader = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, address, heartbeatTimeoutMillis, 1, 1000L, 0);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
         final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
-        assert response != null;
+        assertNotNull(response);
         switch (response.getCode()) {
             case SUCCESS: {
                 return (RegisterBrokerToControllerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerToControllerResponseHeader.class);
@@ -143,20 +148,45 @@ public class ControllerManagerTest {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public RemotingCommand brokerTryElect(final String controllerAddress, final String clusterName,
+        final String brokerName, final String brokerAddress, final RemotingClient client) throws Exception {
+        final ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_ELECT_MASTER, requestHeader);
+        RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
+        assertNotNull(response);
+        return response;
+    }
+
     @Test
     public void testSomeApi() throws Exception {
         mockData();
         final ControllerManager leader = waitLeader(this.controllers);
         String leaderAddr = "localhost" + ":" + leader.getController().getRemotingServer().localListenPort();
 
-        // Register two broker, the first one is master.
+        // Register two broker
         final RegisterBrokerToControllerResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient, 1000L);
         assert responseHeader1 != null;
-        assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
+        assertEquals(responseHeader1.getBrokerId(), MixAll.FIRST_SLAVE_ID);
 
         final RegisterBrokerToControllerResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1, 4000L);
         assert responseHeader2 != null;
-        assertEquals(responseHeader2.getBrokerId(), 2);
+        assertEquals(responseHeader2.getBrokerId(), MixAll.FIRST_SLAVE_ID + 1);
+
+        // Two all try elect itself as master, but only the first can be the master
+        RemotingCommand tryElectCommand1 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient);
+        ElectMasterResponseHeader brokerTryElectResponseHeader1 = (ElectMasterResponseHeader) tryElectCommand1.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+        RemotingCommand tryElectCommand2 = brokerTryElect(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1);
+        ElectMasterResponseHeader brokerTryElectResponseHeader2 = (ElectMasterResponseHeader) tryElectCommand2.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
+
+        assertEquals(SUCCESS, tryElectCommand1.getCode());
+        assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader1.getMasterAddress());
+        assertEquals(1L, brokerTryElectResponseHeader1.getMasterEpoch());
+        assertEquals(1L, brokerTryElectResponseHeader1.getSyncStateSetEpoch());
+
+        assertEquals(CONTROLLER_MASTER_STILL_EXIST, tryElectCommand2.getCode());
+        assertEquals("127.0.0.1:8000", brokerTryElectResponseHeader2.getMasterAddress());
+        assertEquals(1L, brokerTryElectResponseHeader2.getMasterEpoch());
+        assertEquals(1L, brokerTryElectResponseHeader2.getSyncStateSetEpoch());
 
         // Send heartbeat for broker2
         ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 84bf7c72b..873e76760 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -94,8 +95,8 @@ public class DLedgerControllerTest {
         }
     }
 
-    public boolean registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
-        boolean isFirstRegisteredBroker) throws Exception {
+    public void registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
+        long expectBrokerId) throws Exception {
         // Register new broker
         final RegisterBrokerToControllerRequestHeader registerRequest = new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress);
         RemotingCommand response = await().atMost(Duration.ofSeconds(20)).until(() -> {
@@ -109,14 +110,22 @@ public class DLedgerControllerTest {
                 e.printStackTrace();
                 return null;
             }
-        }, item -> item != null);
+        }, Objects::nonNull);
 
         final RegisterBrokerToControllerResponseHeader registerResult = (RegisterBrokerToControllerResponseHeader) response.readCustomHeader();
 
-        if (!isFirstRegisteredBroker) {
-            assertTrue(registerResult.getBrokerId() > 0);
-        }
-        return true;
+        assertEquals(expectBrokerId, registerResult.getBrokerId());
+    }
+
+    public void brokerTryElectMaster(Controller leader, String clusterName, String brokerName, String brokerAddress,
+        boolean exceptSuccess) {
+        final ElectMasterRequestHeader electMasterRequestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+        RemotingCommand command = await().atMost(Duration.ofSeconds(20)).until(() -> {
+            return leader.electMaster(electMasterRequestHeader).get(2, TimeUnit.SECONDS);
+        }, Objects::nonNull);
+
+        ElectMasterResponseHeader header = (ElectMasterResponseHeader) command.readCustomHeader();
+        assertEquals(exceptSuccess, ResponseCode.SUCCESS == command.getCode());
     }
 
     private boolean alterNewInSyncSet(Controller leader, String brokerName, String masterAddress, int masterEpoch,
@@ -167,14 +176,18 @@ public class DLedgerControllerTest {
 
         DLedgerController leader = waitLeader(controllers);
 
-        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", true));
-        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", true));
-        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", true));
+        // register
+        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", 1L);
+        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", 2L);
+        registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", 3L);
+        // try elect
+        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9000", true);
+        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9001", false);
+        brokerTryElectMaster(leader, "cluster1", "broker1", "127.0.0.1:9002", false);
         final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
         final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
-        assertEquals(replicaInfo.getMasterEpoch(), 1);
-        assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
-
+        assertEquals(1, replicaInfo.getMasterEpoch());
+        assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
         // Try alter sync state set
         final HashSet<String> newSyncStateSet = new HashSet<>();
         newSyncStateSet.add("127.0.0.1:9000");
@@ -209,13 +222,13 @@ public class DLedgerControllerTest {
     @Test
     public void testElectMaster() throws Exception {
         final DLedgerController leader = mockMetaData(false);
-        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         setBrokerElectPolicy(leader, "127.0.0.1:9000");
         final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
         final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
         assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+        assertFalse(response.getMasterAddress().isEmpty());
+        assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
     }
 
     @Test
@@ -228,7 +241,7 @@ public class DLedgerControllerTest {
 
         // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
-        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         setBrokerElectPolicy(leader, "127.0.0.1:9000");
         leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
 
@@ -240,19 +253,17 @@ public class DLedgerControllerTest {
         assertEquals(replicaInfo.getMasterAddress(), "");
         assertEquals(replicaInfo.getMasterEpoch(), 2);
 
-        // Now, we start broker1 - 127.0.0.1:9001, but it was not in syncStateSet, so it will not be elected as master.
-        final RegisterBrokerToControllerRequestHeader request1 =
-            new RegisterBrokerToControllerRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
-        final RegisterBrokerToControllerResponseHeader r1 = (RegisterBrokerToControllerResponseHeader) leader.registerBroker(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
-        assertEquals(r1.getBrokerId(), 2);
+        // Now, we start broker1 - 127.0.0.1:9001 to try elect, but it was not in syncStateSet, so it will not be elected as master.
+        final ElectMasterRequestHeader request1 =
+            ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9001");
+        final ElectMasterResponseHeader r1 = (ElectMasterResponseHeader) leader.electMaster(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
         assertEquals(r1.getMasterAddress(), "");
-        assertEquals(r1.getMasterEpoch(), 2);
 
-        // Now, we start broker1 - 127.0.0.1:9000, it will be elected as master
-        final RegisterBrokerToControllerRequestHeader request2 =
-            new RegisterBrokerToControllerRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
-        final RegisterBrokerToControllerResponseHeader r2 = (RegisterBrokerToControllerResponseHeader) leader.registerBroker(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
-        assertEquals(r2.getBrokerId(), 0);
+        // Now, we start broker1 - 127.0.0.1:9000 to try elect, it will be elected as master
+        setBrokerElectPolicy(leader);
+        final ElectMasterRequestHeader request2 =
+            ElectMasterRequestHeader.ofBrokerTrigger("cluster1", "broker1", "127.0.0.1:9000");
+        final ElectMasterResponseHeader r2 = (ElectMasterResponseHeader) leader.electMaster(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
         assertEquals(r2.getMasterAddress(), "127.0.0.1:9000");
         assertEquals(r2.getMasterEpoch(), 3);
     }
@@ -268,7 +279,7 @@ public class DLedgerControllerTest {
         // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
         // However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
         // the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
-        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         setBrokerElectPolicy(leader, "127.0.0.1:9000");
         final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
         future.get(10, TimeUnit.SECONDS);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index d3b03dfe9..57d372349 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -16,11 +16,13 @@
  */
 package org.apache.rocketmq.controller.impl.controller.impl.manager;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
@@ -30,6 +32,7 @@ import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.InSyncStateData;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetResponseHeader;
@@ -55,11 +58,16 @@ public class ReplicasInfoManagerTest {
 
     private DefaultBrokerHeartbeatManager heartbeatManager;
 
+    private ControllerConfig config;
+
+    private ElectPolicy electPolicy;
+
     @Before
     public void init() {
-        final ControllerConfig config = new ControllerConfig();
-        config.setEnableElectUncleanMaster(false);
-        config.setScanNotActiveBrokerInterval(300000000);
+        this.electPolicy = new DefaultElectPolicy((clusterName, brokerAddr) -> true, null);
+        this.config = new ControllerConfig();
+        this.config.setEnableElectUncleanMaster(false);
+        this.config.setScanNotActiveBrokerInterval(300000000);
         this.replicasInfoManager = new ReplicasInfoManager(config);
         this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
         this.heartbeatManager.start();
@@ -72,24 +80,67 @@ public class ReplicasInfoManagerTest {
         this.heartbeatManager = null;
     }
 
-    public boolean registerNewBroker(String clusterName, String brokerName, String brokerAddress,
-        boolean isFirstRegisteredBroker) {
+    public void registerNewBroker(String clusterName, String brokerName, String brokerAddress,
+        long exceptBrokerId, String exceptMasterAddress) {
         // Register new broker
         final RegisterBrokerToControllerRequestHeader registerRequest =
             new RegisterBrokerToControllerRequestHeader(clusterName, brokerName, brokerAddress);
         final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest, (s, v) -> true);
         apply(registerResult.getEvents());
+        // check response
+        assertEquals(ResponseCode.SUCCESS, registerResult.getResponseCode());
+        assertEquals(exceptBrokerId, registerResult.getResponse().getBrokerId());
+        assertEquals(exceptMasterAddress, registerResult.getResponse().getMasterAddress());
+        // check it in state machine
+        final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+        assertEquals(exceptBrokerId, replicaInfo.getBrokerId());
+    }
+
+    public void brokerElectMaster(String clusterName, long brokerId, String brokerName, String brokerAddress,
+        boolean isFirstTryElect) {
 
-        if (isFirstRegisteredBroker) {
-            final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
-            final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
-            assertEquals(replicaInfo.getMasterAddress(), brokerAddress);
-            assertEquals(replicaInfo.getMasterEpoch(), 1);
+        final GetReplicaInfoResponseHeader replicaInfoBefore = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+        byte[] body = this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName)).getBody();
+        InSyncStateData syncStateDataBefore = RemotingSerializable.decode(body, InSyncStateData.class);
+        // Try elect itself as a master
+        ElectMasterRequestHeader requestHeader = ElectMasterRequestHeader.ofBrokerTrigger(clusterName, brokerName, brokerAddress);
+        final ControllerResult<ElectMasterResponseHeader> result = this.replicasInfoManager.electMaster(requestHeader, this.electPolicy);
+        apply(result.getEvents());
+
+        final GetReplicaInfoResponseHeader replicaInfoAfter = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName, brokerAddress)).getResponse();
+        final ElectMasterResponseHeader response = result.getResponse();
+
+        if (isFirstTryElect) {
+            // it should be elected
+            // check response
+            assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
+            assertEquals(1, response.getMasterEpoch());
+            assertEquals(1, response.getSyncStateSetEpoch());
+            assertEquals(brokerAddress, response.getMasterAddress());
+            // check it in state machine
+            assertEquals(brokerAddress, replicaInfoAfter.getMasterAddress());
+            assertEquals(1, replicaInfoAfter.getMasterEpoch());
+            assertEquals(brokerId, replicaInfoAfter.getBrokerId());
         } else {
-            final RegisterBrokerToControllerResponseHeader response = registerResult.getResponse();
-            assertTrue(response.getBrokerId() > 0);
+
+            // failed because now master still exist
+            if (StringUtils.isNotEmpty(replicaInfoBefore.getMasterAddress())) {
+                assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, result.getResponseCode());
+                assertEquals(replicaInfoBefore.getMasterAddress(), response.getMasterAddress());
+                assertEquals(replicaInfoBefore.getMasterEpoch(), response.getMasterEpoch());
+                assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+                return;
+            }
+            if (syncStateDataBefore.getInSyncStateTable().containsKey(brokerAddress) || this.config.isEnableElectUncleanMaster()) {
+                // can be elected successfully
+                assertEquals(ResponseCode.SUCCESS, result.getResponseCode());
+                assertEquals(MixAll.MASTER_ID, replicaInfoAfter.getBrokerId());
+                assertEquals(brokerId, replicaInfoAfter.getBrokerId());
+            } else {
+                // failed because elect nothing
+                assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, result.getResponseCode());
+            }
         }
-        return true;
     }
 
     @Test
@@ -102,6 +153,13 @@ public class ReplicasInfoManagerTest {
             new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9001");
         final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult0 = this.replicasInfoManager.registerBroker(registerRequest0, (s, v) -> true);
         apply(registerResult0.getEvents());
+        final ElectMasterRequestHeader electMasterRequest = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9000");
+        ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult = this.replicasInfoManager.electMaster(electMasterRequest, new DefaultElectPolicy());
+        apply(electMasterResponseHeaderControllerResult.getEvents());
+        final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
+        final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
+        assertEquals("127.0.0.1:9000", replicaInfo.getMasterAddress());
+        assertEquals(1, replicaInfo.getMasterEpoch());
         final HashSet<String> newSyncStateSet = new HashSet<>();
         newSyncStateSet.add("127.0.0.1:9000");
         newSyncStateSet.add("127.0.0.1:9001");
@@ -110,10 +168,17 @@ public class ReplicasInfoManagerTest {
             new RegisterBrokerToControllerRequestHeader("default", "brokerName-a", "127.0.0.1:9002");
         final ControllerResult<RegisterBrokerToControllerResponseHeader> registerResult1 = this.replicasInfoManager.registerBroker(registerRequest1, (s, v) -> StringUtils.equals(v, "127.0.0.1:9001"));
         apply(registerResult1.getEvents());
-        final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
-        final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
-        assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9001");
-        assertEquals(replicaInfo.getMasterEpoch(), 2);
+        assertEquals(3, registerResult1.getResponse().getBrokerId());
+        assertEquals("", registerResult1.getResponse().getMasterAddress());
+        ElectPolicy electPolicy1 = new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"),null);
+        final ElectMasterRequestHeader electMasterRequest1 = ElectMasterRequestHeader.ofBrokerTrigger("default", "brokerName-a", "127.0.0.1:9002");
+        ControllerResult<ElectMasterResponseHeader> electMasterResponseHeaderControllerResult1 = this.replicasInfoManager.electMaster(electMasterRequest1, electPolicy1);
+        apply(electMasterResponseHeaderControllerResult1.getEvents());
+        final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult0 = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("brokerName-a"));
+        final GetReplicaInfoResponseHeader replicaInfo0 = getInfoResult0.getResponse();
+        assertEquals(replicaInfo0.getMasterAddress(), "127.0.0.1:9001");
+        assertTrue(replicaInfo0.getMasterAddress().equals("127.0.0.1:9001") || replicaInfo0.getMasterAddress().equals("127.0.0.1:9002"));
+        assertEquals(replicaInfo0.getMasterEpoch(), 2);
     }
 
     private boolean alterNewInSyncSet(String brokerName, String masterAddress, int masterEpoch,
@@ -139,9 +204,12 @@ public class ReplicasInfoManagerTest {
     }
 
     public void mockMetaData() {
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", true);
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", false);
-        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", false);
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
+        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
         final HashSet<String> newSyncStateSet = new HashSet<>();
         newSyncStateSet.add("127.0.0.1:9000");
         newSyncStateSet.add("127.0.0.1:9001");
@@ -185,43 +253,63 @@ public class ReplicasInfoManagerTest {
             1, 3L, -1L, 1);
     }
 
+    @Test
+    public void testRegisterBrokerSuccess() {
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "");
+        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+    }
+
+    @Test
+    public void testRegisterWithMasterExistResp() {
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", 1L, "");
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", 2L, "");
+        brokerElectMaster("cluster1", 1L, "broker1", "127.0.0.1:9000", true);
+        brokerElectMaster("cluster1", 2L, "broker1", "127.0.0.1:9001", false);
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", 3L, "127.0.0.1:9000");
+        brokerElectMaster("cluster1", 3L, "broker1", "127.0.0.1:9002", false);
+    }
+
     @Test
     public void testElectMasterOldMasterStillAlive() {
         mockMetaData();
-        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataMasterStillAlive();
         final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
             electPolicy);
-        assertEquals(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, cResult.getResponseCode());
+        assertEquals(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, cResult.getResponseCode());
     }
 
     @Test
     public void testElectMasterPreferHigherEpoch() {
         mockMetaData();
-        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherEpoch();
         final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9001", response.getNewMasterAddress());
+        assertFalse(response.getMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9001", response.getMasterAddress());
     }
 
     @Test
     public void testElectMasterPreferHigherOffsetWhenEpochEquals() {
         mockMetaData();
-        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         ElectPolicy electPolicy = new DefaultElectPolicy(this.heartbeatManager::isBrokerActive, this.heartbeatManager::getBrokerLiveInfo);
         mockHeartbeatDataHigherOffset();
         final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+        assertFalse(response.getMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9002", response.getMasterAddress());
     }
 
     @Test
@@ -234,45 +322,51 @@ public class ReplicasInfoManagerTest {
             electPolicy);
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertEquals("127.0.0.1:9002", response.getNewMasterAddress());
+        assertFalse(response.getMasterAddress().isEmpty());
+        assertEquals("127.0.0.1:9002", response.getMasterAddress());
     }
 
     @Test
     public void testElectMaster() {
         mockMetaData();
-        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request,
             new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
         final ElectMasterResponseHeader response = cResult.getResponse();
         assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+        assertFalse(response.getMasterAddress().isEmpty());
+        assertNotEquals(response.getMasterAddress(), "127.0.0.1:9000");
+
+        apply(cResult.getEvents());
 
         final Set<String> brokerSet = new HashSet<>();
         brokerSet.add("127.0.0.1:9000");
         brokerSet.add("127.0.0.1:9001");
         brokerSet.add("127.0.0.1:9002");
-        final ElectMasterRequestHeader assignRequest = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+        assertTrue(alterNewInSyncSet("broker1", response.getMasterAddress(), response.getMasterEpoch(), brokerSet, response.getSyncStateSetEpoch()));
+
+        // test admin try to elect a assignedMaster, but it isn't alive
+        final ElectMasterRequestHeader assignRequest = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000");
         final ControllerResult<ElectMasterResponseHeader> cResult1 = this.replicasInfoManager.electMaster(assignRequest,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> brokerAddress.contains("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
+
         assertEquals(cResult1.getResponseCode(), ResponseCode.CONTROLLER_ELECT_MASTER_FAILED);
 
-        final ElectMasterRequestHeader assignRequest1 = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+        // test admin try to elect a assignedMaster but old master still alive, and the old master is equals to assignedMaster
+        final ElectMasterRequestHeader assignRequest1 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", response.getMasterAddress());
         final ControllerResult<ElectMasterResponseHeader> cResult2 = this.replicasInfoManager.electMaster(assignRequest1,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> brokerAddress.equals("127.0.0.1:9000"), null));
-        assertEquals(cResult2.getResponseCode(), ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+            new DefaultElectPolicy((clusterName, brokerAddress) -> true, null));
+        assertEquals(cResult2.getResponseCode(), ResponseCode.CONTROLLER_MASTER_STILL_EXIST);
 
-        final ElectMasterRequestHeader assignRequest2 = new ElectMasterRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+        // admin successful elect a assignedMaster.
+        final ElectMasterRequestHeader assignRequest2 = ElectMasterRequestHeader.ofAdminTrigger("cluster1", "broker1", "127.0.0.1:9000");
         final ControllerResult<ElectMasterResponseHeader> cResult3 = this.replicasInfoManager.electMaster(assignRequest2,
-            new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
+            new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals(response.getMasterAddress()), null));
         assertEquals(cResult3.getResponseCode(), ResponseCode.SUCCESS);
-        final ElectMasterResponseHeader response3 = cResult3.getResponse();
-        assertEquals(response3.getNewMasterAddress(), "127.0.0.1:9001");
-        assertEquals(response.getMasterEpoch(), 2);
-        assertFalse(response.getNewMasterAddress().isEmpty());
-        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
 
+        final ElectMasterResponseHeader response3 = cResult3.getResponse();
+        assertEquals(response3.getMasterAddress(), "127.0.0.1:9000");
+        assertEquals(response3.getMasterEpoch(), 3);
     }
 
     @Test
@@ -284,7 +378,7 @@ public class ReplicasInfoManagerTest {
 
         // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
         // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
-        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final ElectMasterRequestHeader electRequest = ElectMasterRequestHeader.ofControllerTrigger("broker1");
         final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest,
             new DefaultElectPolicy((clusterName, brokerAddress) -> !brokerAddress.equals("127.0.0.1:9000"), null));
         final List<EventMessage> events = cResult.getEvents();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
index e442217fa..8e4e3770e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/ResponseCode.java
@@ -113,10 +113,11 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int CONTROLLER_INVALID_CLEAN_BROKER_METADATA = 2009;
 
-    public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2010;
+    public static final int CONTROLLER_BROKER_NEED_TO_BE_REGISTERED = 2010;
 
-    public static final int CONTROLLER_ELECT_MASTER_FAILED = 2011;
-
-    public static final int CONTROLLER_REGISTER_BROKER_FAILED = 2012;
+    public static final int CONTROLLER_MASTER_STILL_EXIST = 2011;
 
+    public static final int CONTROLLER_ELECT_MASTER_FAILED = 2012;
+    
+    public static final int CONTROLLER_ALTER_SYNC_STATE_SET_FAILED = 2013;
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
new file mode 100644
index 000000000..2016b2968
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RoleChangeNotifyEntry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+
+public class RoleChangeNotifyEntry {
+
+    private final BrokerMemberGroup brokerMemberGroup;
+
+    private final String masterAddress;
+
+    private final int masterEpoch;
+
+    private final int syncStateSetEpoch;
+
+    public RoleChangeNotifyEntry(BrokerMemberGroup brokerMemberGroup, String masterAddress, int masterEpoch, int syncStateSetEpoch) {
+        this.brokerMemberGroup = brokerMemberGroup;
+        this.masterAddress = masterAddress;
+        this.masterEpoch = masterEpoch;
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    public static RoleChangeNotifyEntry convert(ElectMasterResponseHeader header) {
+        return new RoleChangeNotifyEntry(header.getBrokerMemberGroup(), header.getMasterAddress(), header.getMasterEpoch(), header.getSyncStateSetEpoch());
+    }
+
+
+    public BrokerMemberGroup getBrokerMemberGroup() {
+        return brokerMemberGroup;
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
index 134252007..bb9cfca3e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterRequestHeader.java
@@ -28,9 +28,19 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String brokerName;
 
+    /**
+     * brokerAddress
+     * for brokerTrigger electMaster: this brokerAddress will be elected as a master when it is the first time to elect
+     * in this broker-set
+     * for adminTrigger electMaster: this brokerAddress is also named assignedBrokerAddress, which means we must prefer to elect
+     * it as a new master when this broker is valid.
+     */
     @CFNotNull
     private String brokerAddress;
 
+    @CFNotNull
+    private Boolean forceElect = false;
+
     public ElectMasterRequestHeader() {
     }
 
@@ -44,6 +54,26 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
         this.brokerAddress = brokerAddress;
     }
 
+    public ElectMasterRequestHeader(String clusterName, String brokerName, String brokerAddress, boolean forceElect) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+        this.forceElect = forceElect;
+    }
+
+    public static ElectMasterRequestHeader ofBrokerTrigger(String clusterName, String brokerName,
+        String brokerAddress) {
+        return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress);
+    }
+
+    public static ElectMasterRequestHeader ofControllerTrigger(String brokerName) {
+        return new ElectMasterRequestHeader(brokerName);
+    }
+
+    public static ElectMasterRequestHeader ofAdminTrigger(String clusterName, String brokerName, String brokerAddress) {
+        return new ElectMasterRequestHeader(clusterName, brokerName, brokerAddress, true);
+    }
+
     public String getBrokerName() {
         return brokerName;
     }
@@ -68,16 +98,39 @@ public class ElectMasterRequestHeader implements CommandCustomHeader {
         this.clusterName = clusterName;
     }
 
+    public boolean isForceElect() {
+        return this.forceElect;
+    }
+
     @Override
     public String toString() {
         return "ElectMasterRequestHeader{" +
-            "clusterName='" + clusterName + '\'' +
-            ", brokerName='" + brokerName + '\'' +
-            ", brokerAddress='" + brokerAddress + '\'' +
-            '}';
+                "clusterName='" + clusterName + '\'' +
+                ", brokerName='" + brokerName + '\'' +
+                ", brokerAddress='" + brokerAddress + '\'' +
+                ", forceElect=" + forceElect +
+                '}';
     }
 
     @Override
     public void checkFields() throws RemotingCommandException {
     }
+
+    /**
+     * The elect master request's type
+     */
+    public enum ElectMasterTriggerType {
+        /**
+         * Trigger by broker
+         */
+        BROKER_TRIGGER,
+        /**
+         * Trigger by controller
+         */
+        CONTROLLER_TRIGGER,
+        /**
+         * Trigger by admin
+         */
+        ADMIN_TRIGGER
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 04b602e81..811d64150 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -21,20 +21,22 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
 
 public class ElectMasterResponseHeader implements CommandCustomHeader {
-    private String newMasterAddress;
+    private String masterAddress;
     private int masterEpoch;
     private int syncStateSetEpoch;
     private BrokerMemberGroup brokerMemberGroup;
 
+    private long brokerId = -1;
+
     public ElectMasterResponseHeader() {
     }
 
-    public String getNewMasterAddress() {
-        return newMasterAddress;
+    public String getMasterAddress() {
+        return masterAddress;
     }
 
-    public void setNewMasterAddress(String newMasterAddress) {
-        this.newMasterAddress = newMasterAddress;
+    public void setMasterAddress(String masterAddress) {
+        this.masterAddress = masterAddress;
     }
 
     public int getMasterEpoch() {
@@ -61,13 +63,22 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
         this.brokerMemberGroup = brokerMemberGroup;
     }
 
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
     @Override
     public String toString() {
         return "ElectMasterResponseHeader{" +
-            "newMasterAddress='" + newMasterAddress + '\'' +
+            "masterAddress='" + masterAddress + '\'' +
             ", masterEpoch=" + masterEpoch +
             ", syncStateSetEpoch=" + syncStateSetEpoch +
             ", brokerMemberGroup=" + brokerMemberGroup +
+            ", brokerId=" + brokerId +
             '}';
     }
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
index a901bfbe3..6c1777e36 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/controller/ReElectMasterSubCommand.java
@@ -75,7 +75,7 @@ public class ReElectMasterSubCommand implements SubCommand {
             final ElectMasterResponseHeader metaData = defaultMQAdminExt.electMaster(controllerAddress, clusterName, brokerName, brokerAddress);
             System.out.printf("\n#ClusterName\t%s", clusterName);
             System.out.printf("\n#BrokerName\t%s", brokerName);
-            System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getNewMasterAddress());
+            System.out.printf("\n#BrokerMasterAddr\t%s", metaData.getMasterAddress());
             System.out.printf("\n#MasterEpoch\t%s", metaData.getMasterEpoch());
             System.out.printf("\n#SyncStateSetEpoch\t%s\n", metaData.getSyncStateSetEpoch());
             BrokerMemberGroup brokerMemberGroup = metaData.getBrokerMemberGroup();