You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/07 08:51:28 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Let controller inform broker that role changed. (#4424)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 98a283f5a [Summer of code] Let controller inform broker that role changed. (#4424)
98a283f5a is described below

commit 98a283f5ae2968f9d501155f9702e51a3e9bc214
Author: hzh0425 <64...@qq.com>
AuthorDate: Tue Jun 7 16:51:14 2022 +0800

    [Summer of code] Let controller inform broker that role changed. (#4424)
    
    * add broker api --notifyBrokerRoleChanged --
    
    * add broker api --notifyBrokerRoleChanged --
    
    * let controller inform broker when role changed
    
    * code reivew
---
 .../broker/hacontroller/ReplicasManager.java       | 12 +++-
 .../broker/processor/AdminBrokerProcessor.java     | 21 ++++++
 .../rocketmq/common/protocol/RequestCode.java      |  2 +
 .../NotifyBrokerRoleChangedRequestHeader.java      | 83 ++++++++++++++++++++++
 .../controller/ElectMasterResponseHeader.java      | 24 ++++++-
 .../rocketmq/controller/ControllerManager.java     | 48 +++++++++++++
 .../controller/impl/manager/BrokerInfo.java        |  4 ++
 .../impl/manager/ReplicasInfoManager.java          | 17 +++++
 .../namesrv/routeinfo/RouteInfoManager.java        | 59 ++++++++++++++-
 9 files changed, 265 insertions(+), 5 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
index bb88b6c84..40ac5d1e2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java
@@ -147,6 +147,16 @@ public class ReplicasManager {
         this.scheduledService.shutdown();
     }
 
+    public synchronized void changeBrokerRole(final String newMasterAddress, final int newMasterEpoch, final int syncStateSetEpoch, final long brokerId) {
+        if (StringUtils.isNoneEmpty(newMasterAddress) && newMasterEpoch > this.masterEpoch) {
+            if (StringUtils.equals(newMasterAddress, this.localAddress)) {
+                changeToMaster(newMasterEpoch, syncStateSetEpoch);
+            } else {
+                changeToSlave(newMasterAddress, newMasterEpoch, brokerId);
+            }
+        }
+    }
+
     public void changeToMaster(final int newMasterEpoch, final int syncStateSetEpoch) {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
@@ -193,7 +203,7 @@ public class ReplicasManager {
     public void changeToSlave(final String newMasterAddress, final int newMasterEpoch, long brokerId) {
         synchronized (this) {
             if (newMasterEpoch > this.masterEpoch) {
-                LOGGER.info("Begin to change to slave, brokerName={}, replicas:{}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, this.brokerConfig.getBrokerId());
+                LOGGER.info("Begin to change to slave, brokerName={}, replicas={}, brokerId={}", this.brokerConfig.getBrokerName(), this.localAddress, brokerId);
 
                 brokerController.getMessageStore().disableWrite();
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3aaf4b935..7f8b2a437 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -124,6 +124,7 @@ import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListReque
 import org.apache.rocketmq.common.protocol.header.GetSubscriptionGroupConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
@@ -299,6 +300,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return this.resetMasterFlushOffset(ctx, request);
             case RequestCode.GET_BROKER_EPOCH_CACHE:
                 return this.getBrokerEpochCache(ctx, request);
+            case RequestCode.NOTIFY_BROKER_ROLE_CHANGED:
+                return this.notifyBrokerRoleChanged(ctx, request);
             default:
                 return getUnknownCmdResponse(ctx, request);
         }
@@ -2390,4 +2393,22 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         response.setRemark(null);
         return response;
     }
+
+    private RemotingCommand notifyBrokerRoleChanged(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        NotifyBrokerRoleChangedRequestHeader requestHeader = request.decodeCommandCustomHeader(NotifyBrokerRoleChangedRequestHeader.class);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        LOGGER.info("Receive notifyBrokerRoleChanged request, try to change brokerRole, request:{}", requestHeader);
+
+        final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
+        if (replicasManager != null) {
+            replicasManager.changeBrokerRole(requestHeader.getMasterAddress(), requestHeader.getMasterEpoch(), requestHeader.getSyncStateSetEpoch(), requestHeader.getBrokerId());
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b6313c583..a6c05f445 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -254,4 +254,6 @@ public class RequestCode {
     public static final int CONTROLLER_GET_SYNC_STATE_DATA = 1006;
 
     public static final int GET_BROKER_EPOCH_CACHE = 1007;
+
+    public static final int NOTIFY_BROKER_ROLE_CHANGED = 1008;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
new file mode 100644
index 000000000..33f159dd5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/NotifyBrokerRoleChangedRequestHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class NotifyBrokerRoleChangedRequestHeader implements CommandCustomHeader {
+    private String masterAddress;
+    private int masterEpoch;
+    private int syncStateSetEpoch;
+    // The id of this broker.
+    private long brokerId;
+
+    public NotifyBrokerRoleChangedRequestHeader() {
+    }
+
+    public NotifyBrokerRoleChangedRequestHeader(String masterAddress, int masterEpoch, int syncStateSetEpoch, long brokerId) {
+        this.masterAddress = masterAddress;
+        this.masterEpoch = masterEpoch;
+        this.syncStateSetEpoch = syncStateSetEpoch;
+        this.brokerId = brokerId;
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public void setMasterAddress(String masterAddress) {
+        this.masterAddress = masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public void setMasterEpoch(int masterEpoch) {
+        this.masterEpoch = masterEpoch;
+    }
+
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
+    public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    @Override public String toString() {
+        return "NotifyBrokerRoleChangedRequestHeader{" +
+            "masterAddress='" + masterAddress + '\'' +
+            ", masterEpoch=" + masterEpoch +
+            ", syncStateSetEpoch=" + syncStateSetEpoch +
+            ", brokerId=" + brokerId +
+            '}';
+    }
+
+    @Override public void checkFields() throws RemotingCommandException {
+
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
index 1adcfe3b7..5dc02c421 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
@@ -16,12 +16,15 @@
  */
 package org.apache.rocketmq.common.protocol.header.namesrv.controller;
 
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class ElectMasterResponseHeader implements CommandCustomHeader {
     private String newMasterAddress;
     private int masterEpoch;
+    private int syncStateSetEpoch;
+    private BrokerMemberGroup brokerMemberGroup;
 
     public ElectMasterResponseHeader() {
     }
@@ -42,11 +45,28 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
         this.masterEpoch = masterEpoch;
     }
 
-    @Override
-    public String toString() {
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
+    public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    public BrokerMemberGroup getBrokerMemberGroup() {
+        return brokerMemberGroup;
+    }
+
+    public void setBrokerMemberGroup(BrokerMemberGroup brokerMemberGroup) {
+        this.brokerMemberGroup = brokerMemberGroup;
+    }
+
+    @Override public String toString() {
         return "ElectMasterResponseHeader{" +
             "newMasterAddress='" + newMasterAddress + '\'' +
             ", masterEpoch=" + masterEpoch +
+            ", syncStateSetEpoch=" + syncStateSetEpoch +
+            ", brokerMember=" + brokerMemberGroup +
             '}';
     }
 
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 01f87903d..5f800a5a6 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.controller;
 
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +32,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.controller.impl.DLedgerController;
@@ -38,8 +41,10 @@ import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
 import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -51,6 +56,7 @@ public class ControllerManager {
     private final NettyClientConfig nettyClientConfig;
     private final BrokerHousekeepingService brokerHousekeepingService;
     private final Configuration configuration;
+    private final RemotingClient remotingClient;
     private Controller controller;
     private BrokerHeartbeatManager heartbeatManager;
     private ExecutorService controllerRequestExecutor;
@@ -67,6 +73,7 @@ public class ControllerManager {
             this.controllerConfig, this.nettyServerConfig
         );
         this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
     }
 
     public boolean initialize() {
@@ -102,6 +109,7 @@ public class ControllerManager {
                                 if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
                                     heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
                                 }
+                                notifyBrokerMasterChanged(responseHeader, clusterName);
                             }
                         } catch (Exception ignored) {
                         }
@@ -115,6 +123,44 @@ public class ControllerManager {
         return true;
     }
 
+    /**
+     * Notify master and all slaves for a broker that the master role changed.
+     */
+    public void notifyBrokerMasterChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
+        final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+        if (memberGroup != null) {
+            // First, inform the master
+            final String master = electMasterResult.getNewMasterAddress();
+            if (StringUtils.isNoneEmpty(master) && this.heartbeatManager.isBrokerActive(clusterName, master)) {
+                doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+            }
+
+            // Then, inform all slaves
+            final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
+            for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
+                if (!broker.getValue().equals(master) && this.heartbeatManager.isBrokerActive(clusterName, broker.getValue())) {
+                    doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+                }
+            }
+
+        }
+    }
+
+    public void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId,
+        final ElectMasterResponseHeader responseHeader) {
+        if (StringUtils.isNoneEmpty(brokerAddr)) {
+            log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
+            final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
+                responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+            final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
+            try {
+                this.remotingClient.invokeOneway(brokerAddr, request, 3000);
+            } catch (final Exception e) {
+                log.error("Failed to notify broker {} with id {} that role changed", brokerAddr, brokerId, e);
+            }
+        }
+    }
+
     public void registerProcessor() {
         final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
         final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
@@ -131,12 +177,14 @@ public class ControllerManager {
     public void start() {
         this.heartbeatManager.start();
         this.controller.startup();
+        this.remotingClient.start();
     }
 
     public void shutdown() {
         this.heartbeatManager.shutdown();
         this.controllerRequestExecutor.shutdown();
         this.controller.shutdown();
+        this.remotingClient.shutdown();
     }
 
     public BrokerHeartbeatManager getHeartbeatManager() {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index f819ff1b3..2c283c08c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -62,6 +62,10 @@ public class BrokerInfo {
         return new HashSet<>(this.brokerIdTable.keySet());
     }
 
+    public HashMap<String, Long> getBrokerIdTable() {
+        return new HashMap<>(this.brokerIdTable);
+    }
+
     public Long getBrokerId(final String address) {
         if (this.brokerIdTable.containsKey(address)) {
             return this.brokerIdTable.get(address);
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index f0b680f73..eefb7feeb 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.common.protocol.body.InSyncStateData;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
@@ -194,11 +195,14 @@ public class ReplicasInfoManager {
     private boolean tryElectMaster(final ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
         final Set<String> candidates, final Predicate<String> filter) {
         final int masterEpoch = this.syncStateSetInfoTable.get(brokerName).getMasterEpoch();
+        final int syncStateSetEpoch = this.syncStateSetInfoTable.get(brokerName).getSyncStateSetEpoch();
         for (final String candidate : candidates) {
             if (filter.test(candidate)) {
                 final ElectMasterResponseHeader response = result.getResponse();
                 response.setNewMasterAddress(candidate);
                 response.setMasterEpoch(masterEpoch + 1);
+                response.setSyncStateSetEpoch(syncStateSetEpoch);
+                response.setBrokerMemberGroup(buildBrokerMemberGroup(brokerName));
 
                 final ElectMasterEvent event = new ElectMasterEvent(brokerName, candidate);
                 result.addEvent(event);
@@ -208,6 +212,19 @@ public class ReplicasInfoManager {
         return false;
     }
 
+    private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
+        if (isContainsBroker(brokerName)) {
+            final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final BrokerMemberGroup group = new BrokerMemberGroup(brokerInfo.getClusterName(), brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+            final HashMap<Long, String> memberGroup = new HashMap<>();
+            brokerIdTable.forEach((addr, id)->memberGroup.put(id, addr));
+            group.setBrokerAddrs(memberGroup);
+            return group;
+        }
+        return null;
+    }
+
     public ControllerResult<BrokerRegisterResponseHeader> registerBroker(final BrokerRegisterRequestHeader request) {
         final String brokerName = request.getBrokerName();
         final String brokerAddress = request.getBrokerAddress();
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f1e89f59c..4872bb87e 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -28,10 +28,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BrokerAddrInfo;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -46,9 +49,11 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
 import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -601,8 +606,21 @@ public class RouteInfoManager {
 
                     // Check whether we need to elect a new master
                     if (this.namesrvController != null && this.namesrvController.getControllerConfig().isEnableStartupController() && this.controller != null) {
-                        if (unRegisterRequest.getBrokerId() == 0) {
-                            this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+                        if (unRegisterRequest.getBrokerId() == MixAll.MASTER_ID) {
+                            if (this.controller.isLeaderState()) {
+                                final CompletableFuture<RemotingCommand> future = this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+                                try {
+                                    final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
+                                    final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+                                    if (responseHeader != null) {
+                                        log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, responseHeader);
+                                        notifyBrokerMasterChanged(responseHeader, clusterName);
+                                    }
+                                } catch (Exception ignored) {
+                                }
+                            } else {
+                                log.info("Broker {}'s master shutdown");
+                            }
                         }
                     }
                 }
@@ -910,6 +928,43 @@ public class RouteInfoManager {
         }
     }
 
+    /**
+     * Notify master and all slaves for a broker that the master role changed.
+     */
+    private void notifyBrokerMasterChanged(final ElectMasterResponseHeader electMasterResult, final String clusterName) {
+        final BrokerMemberGroup memberGroup = electMasterResult.getBrokerMemberGroup();
+        if (memberGroup != null) {
+            // First, inform the master
+            final String master = electMasterResult.getNewMasterAddress();
+            if (StringUtils.isNoneEmpty(master) && isBrokerAlive(clusterName, master)) {
+                doNotifyBrokerRoleChanged(master, MixAll.MASTER_ID, electMasterResult);
+            }
+
+            // Then, inform all slaves
+            final Map<Long, String> brokerIdAddrs = memberGroup.getBrokerAddrs();
+            for (Map.Entry<Long, String> broker : brokerIdAddrs.entrySet()) {
+                if (!broker.getValue().equals(master) && isBrokerAlive(clusterName, broker.getValue())) {
+                    doNotifyBrokerRoleChanged(broker.getValue(), broker.getKey(), electMasterResult);
+                }
+            }
+
+        }
+    }
+
+    private void doNotifyBrokerRoleChanged(final String brokerAddr, final Long brokerId, final ElectMasterResponseHeader responseHeader) {
+        if (StringUtils.isNoneEmpty(brokerAddr)) {
+            log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", brokerAddr, brokerId, responseHeader);
+            final NotifyBrokerRoleChangedRequestHeader requestHeader = new NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
+                responseHeader.getMasterEpoch(), responseHeader.getSyncStateSetEpoch(), brokerId);
+            final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_BROKER_ROLE_CHANGED, requestHeader);
+            try {
+                this.namesrvController.getRemotingClient().invokeOneway(brokerAddr, request, 3000);
+            } catch (final Exception e) {
+                log.error("Failed to notify broker {} with id {} that role changed", brokerAddr, brokerId, e);
+            }
+        }
+    }
+
     private List<String> chooseBrokerAddrsToNotify(Map<Long, String> brokerAddrMap, String offlineBrokerAddr) {
         if (offlineBrokerAddr != null || brokerAddrMap.size() == 1) {
             // notify the reset brokers.