You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/03/21 06:00:07 UTC

[rocketmq] branch develop updated: [ISSUE #6382] Periodically check for inactive masters (#6383)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1e2a30126 [ISSUE #6382] Periodically check for inactive masters (#6383)
1e2a30126 is described below

commit 1e2a301266a41e7b4088f6f956dd850df407a600
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Tue Mar 21 13:59:53 2023 +0800

    [ISSUE #6382] Periodically check for inactive masters (#6383)
    
    * feat(controller): concurrent protect ReplicasInfoManager#replicaInfoTable and ReplicasInfoManager#syncStateSetInfoTable
    
    1. concurrent protect ReplicasInfoManager#replicaInfoTable and
    ReplicasInfoManager#syncStateSetInfoTable
    
    * docs(controller): make statemachine thread-safe and support timed checking inactive master in each broker-set
    
    1. make statemachine thread-safe
    2. support timed checking inactive
    master in each broker-set
    
    * fix(controller): Add some logic when visit the BrokerReplicaInfo to avoid NPE
    
    1. Add some logic when visit the BrokerReplicaInfo to avoid NPE
    
    * test(controller): polish some test logic
    
    1. polish some test logic
    
    * fix(controller): Revise back the ControllerConfig#controllerStorePath to avoid the Incompatibility problems
    
    1. Revise back the ControllerConfig#controllerStorePath to avoid the
    Incompatibility problems
---
 .../apache/rocketmq/common/ControllerConfig.java   | 14 +++++
 .../controller/BrokerHeartbeatManager.java         | 10 +---
 .../org/apache/rocketmq/controller/Controller.java |  8 +++
 .../rocketmq/controller/ControllerManager.java     | 52 +++++++++++------
 .../controller/helper/BrokerLifecycleListener.java | 25 +++++++++
 .../controller/impl/DLedgerController.java         | 62 +++++++++++++++++++--
 .../heartbeat/DefaultBrokerHeartbeatManager.java   |  3 +-
 .../controller/impl/manager/BrokerReplicaInfo.java | 23 +++++---
 .../impl/manager/ReplicasInfoManager.java          | 41 ++++++++++----
 .../controller/impl/manager/SyncStateInfo.java     | 31 ++++-------
 .../controller/impl/DLedgerControllerTest.java     | 65 +++++++++++++++++++---
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |  2 +-
 12 files changed, 256 insertions(+), 80 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index 942c03874..b35198fc6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -25,6 +25,7 @@ public class ControllerConfig {
 
     /**
      * Interval of periodic scanning for non-active broker;
+     * Unit: millisecond
      */
     private long scanNotActiveBrokerInterval = 5 * 1000;
 
@@ -58,6 +59,11 @@ public class ControllerConfig {
      * Whether notify broker when its role changed
      */
     private volatile boolean notifyBrokerRoleChanged = true;
+    /**
+     * Interval of periodic scanning for non-active master in each broker-set;
+     * Unit: millisecond
+     */
+    private long scanInactiveMasterInterval = 5 * 1000;
 
     public String getRocketmqHome() {
         return rocketmqHome;
@@ -162,4 +168,12 @@ public class ControllerConfig {
     public void setNotifyBrokerRoleChanged(boolean notifyBrokerRoleChanged) {
         this.notifyBrokerRoleChanged = notifyBrokerRoleChanged;
     }
+
+    public long getScanInactiveMasterInterval() {
+        return scanInactiveMasterInterval;
+    }
+
+    public void setScanInactiveMasterInterval(long scanInactiveMasterInterval) {
+        this.scanInactiveMasterInterval = scanInactiveMasterInterval;
+    }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 3a7dcaf58..71b274c09 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.controller;
 
 import io.netty.channel.Channel;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
 
 public interface BrokerHeartbeatManager {
@@ -46,7 +47,7 @@ public interface BrokerHeartbeatManager {
     /**
      * Add BrokerLifecycleListener.
      */
-    void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
+    void registerBrokerLifecycleListener(final BrokerLifecycleListener listener);
 
     /**
      * Broker channel close
@@ -62,11 +63,4 @@ public interface BrokerHeartbeatManager {
      * Check whether broker active
      */
     boolean isBrokerActive(final String clusterName, final String brokerName, final Long brokerId);
-
-    interface BrokerLifecycleListener {
-        /**
-         * Trigger when broker inactive.
-         */
-        void onBrokerInactive(final String clusterName, final String brokerName, final Long brokerId);
-    }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 2c0372fec..cda613091 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.controller;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
@@ -109,6 +111,12 @@ public interface Controller {
      */
     CompletableFuture<RemotingCommand> getSyncStateData(final List<String> brokerNames);
 
+    /**
+     * Add broker's lifecycle listener
+     * @param listener listener
+     */
+    void registerBrokerLifecycleListener(final BrokerLifecycleListener listener);
+
     /**
      * Get the remotingServer used by the controller, the upper layer will reuse this remotingServer.
      */
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index a9949bde0..46826517c 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -117,7 +117,8 @@ public class ControllerManager {
         this.heartbeatManager.initialize();
 
         // Register broker inactive listener
-        this.heartbeatManager.addBrokerLifecycleListener(this::onBrokerInactive);
+        this.heartbeatManager.registerBrokerLifecycleListener(this::onBrokerInactive);
+        this.controller.registerBrokerLifecycleListener(this::onBrokerInactive);
         registerProcessor();
         return true;
     }
@@ -128,34 +129,49 @@ public class ControllerManager {
      *
      * @param clusterName The cluster name of this inactive broker
      * @param brokerName The inactive broker name
-     * @param brokerId The inactive broker id
+     * @param brokerId The inactive broker id, null means that the election forced to be triggered
      */
     private void onBrokerInactive(String clusterName, String brokerName, Long brokerId) {
         if (controller.isLeaderState()) {
-            try {
-                final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
-                final RemotingCommand replicaInfoResponse = replicaInfoFuture.get(5, TimeUnit.SECONDS);
+            if (brokerId == null) {
+                // Means that force triggering election for this broker-set
+                triggerElectMaster(brokerName);
+                return;
+            }
+            final CompletableFuture<RemotingCommand> replicaInfoFuture = controller.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
+            replicaInfoFuture.whenCompleteAsync((replicaInfoResponse, err) -> {
+                if (err != null || replicaInfoResponse == null) {
+                    log.error("Failed to get replica-info for broker-set: {} when OnBrokerInactive", brokerName, err);
+                    return;
+                }
                 final GetReplicaInfoResponseHeader replicaInfoResponseHeader = (GetReplicaInfoResponseHeader) replicaInfoResponse.readCustomHeader();
                 // Not master broker offline
                 if (!brokerId.equals(replicaInfoResponseHeader.getMasterBrokerId())) {
-                    log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName);
+                    log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", brokerId, brokerName);
                     return;
                 }
+                // Trigger election
+                triggerElectMaster(brokerName);
+            });
+        } else {
+            log.warn("The broker with brokerId: {} in broker-set: {} has been inactive", brokerId, brokerName);
+        }
+    }
 
-                final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
-                final RemotingCommand electMasterResponse = electMasterFuture.get(5, TimeUnit.SECONDS);
-                if (electMasterResponse.getCode() == ResponseCode.SUCCESS) {
-                    log.info("The broker with brokerId: {} in broker-set: {} shutdown, elect a new master done, result: {}", brokerId, brokerName, electMasterResponse);
-                    if (controllerConfig.isNotifyBrokerRoleChanged()) {
-                        notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse));
-                    }
+    private void triggerElectMaster(String brokerName) {
+        final CompletableFuture<RemotingCommand> electMasterFuture = controller.electMaster(ElectMasterRequestHeader.ofControllerTrigger(brokerName));
+        electMasterFuture.whenCompleteAsync((electMasterResponse, err) -> {
+            if (err != null || electMasterResponse == null) {
+                log.error("Failed to trigger elect-master in broker-set: {}", brokerName, err);
+                return;
+            }
+            if (electMasterResponse.getCode() == ResponseCode.SUCCESS) {
+                log.info("Elect a new master in broker-set: {} done, result: {}", brokerName, electMasterResponse);
+                if (controllerConfig.isNotifyBrokerRoleChanged()) {
+                    notifyBrokerRoleChanged(RoleChangeNotifyEntry.convert(electMasterResponse));
                 }
-            } catch (Exception e) {
-                log.error("", e);
             }
-        } else {
-            log.warn("The broker with brokerId: {} in broker-set: {} shutdown", brokerId, brokerName);
-        }
+        });
     }
 
     /**
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java
new file mode 100644
index 000000000..31fa47632
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/helper/BrokerLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.helper;
+
+public interface BrokerLifecycleListener {
+    /**
+     * Trigger when broker inactive.
+     */
+    void onBrokerInactive(final String clusterName, final String brokerName, final Long brokerId);
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 491cb16d1..b6007fe09 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -32,6 +32,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.controller.helper.BrokerValidPredicate;
 import org.apache.rocketmq.controller.impl.event.ControllerResult;
 import org.apache.rocketmq.controller.impl.event.EventMessage;
@@ -80,6 +83,11 @@ public class DLedgerController implements Controller {
     private final EventSerializer eventSerializer;
     private final RoleChangeHandler roleHandler;
     private final DLedgerControllerStateMachine statemachine;
+    private final ScheduledExecutorService scanInactiveMasterService;
+
+    private ScheduledFuture scanInactiveMasterFuture;
+
+    private List<BrokerLifecycleListener> brokerLifecycleListeners;
 
     // Usr for checking whether the broker is alive
     private BrokerValidPredicate brokerAlivePredicate;
@@ -116,6 +124,8 @@ public class DLedgerController implements Controller {
         this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
         this.dLedgerServer.registerStateMachine(this.statemachine);
         this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+        this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
+        this.brokerLifecycleListeners = new ArrayList<>();
     }
 
     @Override
@@ -125,6 +135,7 @@ public class DLedgerController implements Controller {
 
     @Override
     public void shutdown() {
+        this.cancelScanInactiveFuture();
         this.dLedgerServer.shutdown();
     }
 
@@ -193,6 +204,11 @@ public class DLedgerController implements Controller {
             () -> this.replicasInfoManager.getSyncStateData(brokerNames, brokerAlivePredicate), false);
     }
 
+    @Override
+    public void registerBrokerLifecycleListener(BrokerLifecycleListener listener) {
+        this.brokerLifecycleListeners.add(listener);
+    }
+
     @Override
     public RemotingCommand getControllerMetadata() {
         final MemberState state = getMemberState();
@@ -218,19 +234,40 @@ public class DLedgerController implements Controller {
             () -> this.replicasInfoManager.cleanBrokerData(requestHeader, this.brokerAlivePredicate), true);
     }
 
+    /**
+     * Scan all broker-set in statemachine, find that the broker-set which
+     * its master has been timeout but still has at least one broker keep alive with controller,
+     * and we trigger an election to update its state.
+     */
+    private void scanInactiveMasterAndTriggerReelect() {
+        if (!this.roleHandler.isLeaderState()) {
+            cancelScanInactiveFuture();
+            return;
+        }
+        List<String> brokerSets = this.replicasInfoManager.scanNeedReelectBrokerSets(this.brokerAlivePredicate);
+        for (String brokerName : brokerSets) {
+            // Notify ControllerManager
+            this.brokerLifecycleListeners.forEach(listener -> listener.onBrokerInactive(null, brokerName, null));
+        }
+    }
+
     /**
      * Append the request to DLedger, and wait for DLedger to commit the request.
      */
-    private boolean appendToDLedgerAndWait(final AppendEntryRequest request) throws Throwable {
+    private boolean appendToDLedgerAndWait(final AppendEntryRequest request) {
         if (request != null) {
             request.setGroup(this.dLedgerConfig.getGroup());
             request.setRemoteId(this.dLedgerConfig.getSelfId());
-
-            final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
-            if (dLedgerFuture.getPos() == -1) {
+            try {
+                final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+                if (dLedgerFuture.getPos() == -1) {
+                    return false;
+                }
+                dLedgerFuture.get(5, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Failed to append entry to DLedger", e);
                 return false;
             }
-            dLedgerFuture.get(5, TimeUnit.SECONDS);
             return true;
         }
         return false;
@@ -249,6 +286,13 @@ public class DLedgerController implements Controller {
         this.electPolicy = electPolicy;
     }
 
+    private void cancelScanInactiveFuture() {
+        if (this.scanInactiveMasterFuture != null) {
+            this.scanInactiveMasterFuture.cancel(true);
+            this.scanInactiveMasterFuture = null;
+        }
+    }
+
     /**
      * Event handler that handle event
      */
@@ -433,11 +477,13 @@ public class DLedgerController implements Controller {
                         this.currentRole = MemberState.Role.CANDIDATE;
                         log.info("Controller {} change role to candidate", this.selfId);
                         DLedgerController.this.stopScheduling();
+                        DLedgerController.this.cancelScanInactiveFuture();
                         break;
                     case FOLLOWER:
                         this.currentRole = MemberState.Role.FOLLOWER;
                         log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
                         DLedgerController.this.stopScheduling();
+                        DLedgerController.this.cancelScanInactiveFuture();
                         break;
                     case LEADER: {
                         log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
@@ -452,6 +498,12 @@ public class DLedgerController implements Controller {
                                 if (appendToDLedgerAndWait(request)) {
                                     this.currentRole = MemberState.Role.LEADER;
                                     DLedgerController.this.startScheduling();
+                                    if (DLedgerController.this.scanInactiveMasterFuture == null) {
+                                        long scanInactiveMasterInterval = DLedgerController.this.controllerConfig.getScanInactiveMasterInterval();
+                                        DLedgerController.this.scanInactiveMasterFuture =
+                                                DLedgerController.this.scanInactiveMasterService.scheduleAtFixedRate(DLedgerController.this::scanInactiveMasterAndTriggerReelect,
+                                                        scanInactiveMasterInterval, scanInactiveMasterInterval, TimeUnit.MILLISECONDS);
+                                    }
                                     break;
                                 }
                             } catch (final Throwable e) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index 63b0c2e5f..dc824281b 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -99,7 +100,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
     }
 
     @Override
-    public void addBrokerLifecycleListener(BrokerLifecycleListener listener) {
+    public void registerBrokerLifecycleListener(BrokerLifecycleListener listener) {
         this.brokerLifecycleListeners.add(listener);
     }
 
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
index 27eeab80e..f93074793 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerReplicaInfo.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
@@ -34,13 +36,13 @@ public class BrokerReplicaInfo {
     // Start from 1
     private final AtomicLong nextAssignBrokerId;
 
-    private final HashMap<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
+    private final Map<Long/*brokerId*/, Pair<String/*ipAddress*/, String/*registerCheckCode*/>> brokerIdInfo;
 
     public BrokerReplicaInfo(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
         this.nextAssignBrokerId = new AtomicLong(MixAll.FIRST_BROKER_CONTROLLER_ID);
-        this.brokerIdInfo = new HashMap<>();
+        this.brokerIdInfo = new ConcurrentHashMap<>();
     }
 
     public void removeBrokerId(final Long brokerId) {
@@ -72,8 +74,8 @@ public class BrokerReplicaInfo {
         return new HashSet<>(this.brokerIdInfo.keySet());
     }
 
-    public HashMap<Long, String> getBrokerIdTable() {
-        HashMap<Long/*brokerId*/, String/*address*/> map = new HashMap<>(this.brokerIdInfo.size());
+    public Map<Long, String> getBrokerIdTable() {
+        Map<Long/*brokerId*/, String/*address*/> map = new HashMap<>(this.brokerIdInfo.size());
         this.brokerIdInfo.forEach((id, pair) -> {
             map.put(id, pair.getObject1());
         });
@@ -81,20 +83,25 @@ public class BrokerReplicaInfo {
     }
 
     public String getBrokerAddress(final Long brokerId) {
-        if (this.brokerIdInfo.containsKey(brokerId)) {
-            return this.brokerIdInfo.get(brokerId).getObject1();
+        if (brokerId == null) return null;
+        Pair<String, String> pair = this.brokerIdInfo.get(brokerId);
+        if (pair != null) {
+            return pair.getObject1();
         }
         return null;
     }
 
     public String getBrokerRegisterCheckCode(final Long brokerId) {
-        if (this.brokerIdInfo.containsKey(brokerId)) {
-            return this.brokerIdInfo.get(brokerId).getObject2();
+        if (brokerId == null) return null;
+        Pair<String, String> pair = this.brokerIdInfo.get(brokerId);
+        if (pair != null) {
+            return pair.getObject2();
         }
         return null;
     }
 
     public void updateBrokerAddress(final Long brokerId, final String brokerAddress) {
+        if (brokerId == null) return;
         Pair<String, String> oldPair = this.brokerIdInfo.get(brokerId);
         if (oldPair != null) {
             this.brokerIdInfo.put(brokerId, new Pair<>(brokerAddress, oldPair.getObject2()));
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 2f5c3307c..b0a67531d 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -19,9 +19,11 @@ package org.apache.rocketmq.controller.impl.manager;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -63,8 +65,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.Register
 
 /**
  * The manager that manages the replicas info for all brokers. We can think of this class as the controller's memory
- * state machine It should be noted that this class is not thread safe, and the upper layer needs to ensure that it can
- * be called sequentially
+ * state machine. If the upper layer want to update the statemachine, it must sequentially call its methods.
  */
 public class ReplicasInfoManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
@@ -74,8 +75,8 @@ public class ReplicasInfoManager {
 
     public ReplicasInfoManager(final ControllerConfig config) {
         this.controllerConfig = config;
-        this.replicaInfoTable = new HashMap<>();
-        this.syncStateSetInfoTable = new HashMap<>();
+        this.replicaInfoTable = new ConcurrentHashMap<String, BrokerReplicaInfo>();
+        this.syncStateSetInfoTable = new ConcurrentHashMap<String, SyncStateInfo>();
     }
 
     public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
@@ -221,7 +222,7 @@ public class ReplicasInfoManager {
             response.setSyncStateSetEpoch(syncStateSetEpoch + 1);
             ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet);
 
-            BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerName);
+            BrokerMemberGroup brokerMemberGroup = buildBrokerMemberGroup(brokerReplicaInfo);
             if (null != brokerMemberGroup) {
                 responseBody.setBrokerMemberGroup(brokerMemberGroup);
             }
@@ -244,12 +245,11 @@ public class ReplicasInfoManager {
         return result;
     }
 
-    private BrokerMemberGroup buildBrokerMemberGroup(final String brokerName) {
-        if (isContainsBroker(brokerName)) {
-            final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
-            final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerName);
-            final HashMap<Long, String> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
-            final HashMap<Long, String> memberGroup = new HashMap<>();
+    private BrokerMemberGroup buildBrokerMemberGroup(final BrokerReplicaInfo brokerReplicaInfo) {
+        if (brokerReplicaInfo != null) {
+            final BrokerMemberGroup group = new BrokerMemberGroup(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName());
+            final Map<Long, String> brokerIdTable = brokerReplicaInfo.getBrokerIdTable();
+            final Map<Long, String> memberGroup = new HashMap<>();
             brokerIdTable.forEach((id, addr) -> memberGroup.put(id, addr));
             group.setBrokerAddrs(memberGroup);
             return group;
@@ -433,6 +433,25 @@ public class ReplicasInfoManager {
         return result;
     }
 
+    public List<String/*BrokerName*/> scanNeedReelectBrokerSets(final BrokerValidPredicate validPredicate) {
+        List<String> needReelectBrokerSets = new LinkedList<>();
+        this.syncStateSetInfoTable.forEach((brokerName, syncStateInfo) -> {
+            Long masterBrokerId = syncStateInfo.getMasterBrokerId();
+            String clusterName = syncStateInfo.getClusterName();
+            // Now master is inactive
+            if (masterBrokerId != null && !validPredicate.check(clusterName, brokerName, masterBrokerId)) {
+                // Still at least one broker alive
+                Set<Long> brokerIds = this.replicaInfoTable.get(brokerName).getBrokerIdTable().keySet();
+                boolean alive = brokerIds.stream().anyMatch(id -> validPredicate.check(clusterName, brokerName, id));
+                if (alive) {
+                    needReelectBrokerSets.add(brokerName);
+                }
+            }
+        });
+        return needReelectBrokerSets;
+    }
+
+
     /**
      * Apply events to memory statemachine.
      *
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 0951df93a..a01298d9a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.controller.impl.manager;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Manages the syncStateSet of broker replicas.
@@ -26,45 +27,33 @@ import java.util.Set;
 public class SyncStateInfo {
     private final String clusterName;
     private final String brokerName;
+    private final AtomicInteger masterEpoch;
+    private final AtomicInteger syncStateSetEpoch;
 
     private Set<Long/*brokerId*/> syncStateSet;
-    private int syncStateSetEpoch;
 
     private Long masterBrokerId;
-    private int masterEpoch;
 
     public SyncStateInfo(String clusterName, String brokerName) {
         this.clusterName = clusterName;
         this.brokerName = brokerName;
-        this.masterEpoch = 0;
-        this.syncStateSetEpoch = 0;
+        this.masterEpoch = new AtomicInteger(0);
+        this.syncStateSetEpoch = new AtomicInteger(0);
         this.syncStateSet = Collections.emptySet();
     }
 
-
-    public SyncStateInfo(String clusterName, String brokerName, Long masterBrokerId) {
-        this.clusterName = clusterName;
-        this.brokerName = brokerName;
-        this.masterBrokerId = masterBrokerId;
-        this.masterEpoch = 1;
-        this.syncStateSet = new HashSet<>();
-        this.syncStateSet.add(masterBrokerId);
-        this.syncStateSetEpoch = 1;
-    }
-
-
     public void updateMasterInfo(Long masterBrokerId) {
         this.masterBrokerId = masterBrokerId;
-        this.masterEpoch++;
+        this.masterEpoch.incrementAndGet();
     }
 
     public void updateSyncStateSetInfo(Set<Long> newSyncStateSet) {
         this.syncStateSet = new HashSet<>(newSyncStateSet);
-        this.syncStateSetEpoch++;
+        this.syncStateSetEpoch.incrementAndGet();
     }
 
     public boolean isFirstTimeForElect() {
-        return this.masterEpoch == 0;
+        return this.masterEpoch.get() == 0;
     }
 
     public boolean isMasterExist() {
@@ -84,7 +73,7 @@ public class SyncStateInfo {
     }
 
     public int getSyncStateSetEpoch() {
-        return syncStateSetEpoch;
+        return syncStateSetEpoch.get();
     }
 
     public Long getMasterBrokerId() {
@@ -92,7 +81,7 @@ public class SyncStateInfo {
     }
 
     public int getMasterEpoch() {
-        return masterEpoch;
+        return masterEpoch.get();
     }
 
     public void removeFromSyncState(final Long brokerId) {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
index eaf78b63d..595a5cb65 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DLedgerControllerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.controller.impl;
 
-import io.openmessaging.storage.dledger.DLedgerConfig;
 import java.io.File;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -26,6 +25,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.ControllerConfig;
@@ -62,8 +63,7 @@ public class DLedgerControllerTest {
     private List<String> baseDirs;
     private List<DLedgerController> controllers;
 
-    public DLedgerController launchController(final String group, final String peers, final String selfId,
-        String storeType, final boolean isEnableElectUncleanMaster) {
+    public DLedgerController launchController(final String group, final String peers, final String selfId, final boolean isEnableElectUncleanMaster) {
         String tmpdir = System.getProperty("java.io.tmpdir");
         final String path = (StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir + File.separator) + group + File.separator + selfId;
         baseDirs.add(path);
@@ -75,7 +75,7 @@ public class DLedgerControllerTest {
         config.setControllerStorePath(path);
         config.setMappedFileSize(10 * 1024 * 1024);
         config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
-
+        config.setScanInactiveMasterInterval(1000);
         final DLedgerController controller = new DLedgerController(config, (str1, str2, str3) -> true);
 
         controller.startup();
@@ -172,9 +172,9 @@ public class DLedgerControllerTest {
     public DLedgerController mockMetaData(boolean enableElectUncleanMaster) throws Exception {
         String group = UUID.randomUUID().toString();
         String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
-        DLedgerController c0 = launchController(group, peers, "n0", DLedgerConfig.MEMORY, enableElectUncleanMaster);
-        DLedgerController c1 = launchController(group, peers, "n1", DLedgerConfig.MEMORY, enableElectUncleanMaster);
-        DLedgerController c2 = launchController(group, peers, "n2", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+        DLedgerController c0 = launchController(group, peers, "n0", enableElectUncleanMaster);
+        DLedgerController c1 = launchController(group, peers, "n1", enableElectUncleanMaster);
+        DLedgerController c2 = launchController(group, peers, "n2", enableElectUncleanMaster);
         controllers.add(c0);
         controllers.add(c1);
         controllers.add(c2);
@@ -236,6 +236,57 @@ public class DLedgerControllerTest {
         assertNotEquals(DEFAULT_IP[0], response.getMasterAddress());
     }
 
+    @Test
+    public void testBrokerLifecycleListener() throws Exception {
+        final DLedgerController leader = mockMetaData(false);
+        // Mock that master broker has been inactive, and try to elect a new master from sync-state-set
+        // But we shut down two controller, so the ElectMasterEvent will be appended to DLedger failed.
+        // So the statemachine still keep the stale master's information
+        List<DLedgerController> removed = controllers.stream().filter(controller -> controller != leader).collect(Collectors.toList());
+        for (DLedgerController dLedgerController : removed) {
+            dLedgerController.shutdown();
+            controllers.remove(dLedgerController);
+        }
+        final ElectMasterRequestHeader request = ElectMasterRequestHeader.ofControllerTrigger(DEFAULT_BROKER_NAME);
+        setBrokerElectPolicy(leader, 1L);
+        Exception exception = null;
+        try {
+            leader.electMaster(request).get(5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            exception = e;
+        }
+        assertNotNull(exception);
+        // Shut down leader controller
+        leader.shutdown();
+        controllers.remove(leader);
+        // Restart two controller
+        for (DLedgerController controller : removed) {
+            if (controller != leader) {
+                ControllerConfig config = controller.getControllerConfig();
+                DLedgerController newController = launchController(config.getControllerDLegerGroup(), config.getControllerDLegerPeers(), config.getControllerDLegerSelfId(), false);
+                controllers.add(newController);
+                newController.startup();
+            }
+        }
+        DLedgerController newLeader = waitLeader(controllers);
+        setBrokerAlivePredicate(newLeader, 1L);
+        // Check if the statemachine is stale
+        final RemotingCommand resp = newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader(DEFAULT_BROKER_NAME)).
+                get(10, TimeUnit.SECONDS);
+        final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) resp.readCustomHeader();
+        assertEquals(1, replicaInfo.getMasterBrokerId().longValue());
+        assertEquals(1, replicaInfo.getMasterEpoch().intValue());
+
+        // Register broker's lifecycle listener
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        newLeader.registerBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
+            assertEquals(DEFAULT_BROKER_NAME, brokerName);
+            atomicBoolean.set(true);
+        });
+        Thread.sleep(2000);
+        assertTrue(atomicBoolean.get());
+    }
+
     @Test
     public void testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws Exception {
         final DLedgerController leader = mockMetaData(false);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index b97ea3249..395f3bab4 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -41,7 +41,7 @@ public class DefaultBrokerHeartbeatManagerTest {
     @Test
     public void testDetectBrokerAlive() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
+        this.heartbeatManager.registerBrokerLifecycleListener((clusterName, brokerName, brokerId) -> {
             latch.countDown();
         });
         this.heartbeatManager.onBrokerHeartbeat("cluster1", "broker1", "127.0.0.1:7000", 1L,3000L, null,