You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/30 12:29:58 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of Code] Dledger controller implementation (#4195)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new d26773a9a [Summer of Code] Dledger controller implementation (#4195)
d26773a9a is described below

commit d26773a9ab181ce825dc465c900840f48e1ec6b8
Author: hzh0425 <58...@users.noreply.github.com>
AuthorDate: Sat Apr 30 20:29:53 2022 +0800

    [Summer of Code] Dledger controller implementation (#4195)
    
    * feature: initial, add controllerApi, event, request and response
    
    * feature: Apply event in ReplicasInfoManager
    
    * feature: Done the work in ReplicasInfoManager
    next step: try test
    
    * feature: Add some test for ReplicasInfoManager
    
    * feature: Build the architecture of dledgerController
    
    * feature: Done the work in controller
    
    * style: review code
    
    * feature: add controllerProcessor in name-srv
    
    * style: use defensive copy in constructor;
    
    * style: review code
    
    * style: review code
    
    * feature: let controller api return RemotingCommand
    
    * feature:
    1.remove originMasterId in replicasInfo
    2.add DledgerControllerConfig
    
    * feature:
    1.add option isProcessReadEvent.
    2.add ControllerConfig
    
    * feature:
    add namesrv into dledgerController to predict whether the broker is alive.
    
    * style: code review
    
    * feature: process initial log when controller become leader
    
    * style: review code
    
    * style: review code
    
    * style: review code
    
    * style: change version
    
    * fixbug
---
 .../rocketmq/common/constant/LoggerName.java       |   1 +
 .../rocketmq/common/namesrv/ControllerConfig.java  | 133 +++++++
 .../rocketmq/common/protocol/RequestCode.java      |  13 +
 .../rocketmq/common/protocol/ResponseCode.java     |  11 +
 .../controller/AlterSyncStateSetRequestHeader.java |  74 ++++
 .../AlterSyncStateSetResponseHeader.java           |  58 +++
 .../controller/ElectMasterRequestHeader.java       |  43 +++
 .../controller/ElectMasterResponseHeader.java      |  56 +++
 .../controller/GetMetaDataResponseHeader.java      |  49 +++
 .../controller/GetReplicaInfoRequestHeader.java    |  43 +++
 .../controller/GetReplicaInfoResponseHeader.java   |  77 ++++
 .../controller/RegisterBrokerRequestHeader.java    |  57 +++
 .../controller/RegisterBrokerResponseHeader.java   |  67 ++++
 .../rocketmq/common/utils/FastJsonSerializer.java  |  62 ++++
 .../apache/rocketmq/common/utils/Serializer.java   |  35 ++
 namesrv/pom.xml                                    |   5 +
 .../apache/rocketmq/namesrv/NamesrvController.java |  55 ++-
 .../apache/rocketmq/namesrv/NamesrvStartup.java    |   6 +-
 .../rocketmq/namesrv/controller/Controller.java    |  90 +++++
 .../namesrv/controller/impl/DledgerController.java | 413 +++++++++++++++++++++
 .../impl/DledgerControllerStateMachine.java        |  75 ++++
 .../namesrv/controller/manager/BrokerIdInfo.java   |  54 +++
 .../controller/manager/InSyncReplicasInfo.java     |  82 ++++
 .../controller/manager/ReplicasInfoManager.java    | 346 +++++++++++++++++
 .../manager/event/AlterSyncStateSetEvent.java      |  55 +++
 .../manager/event/ApplyBrokerIdEvent.java          |  58 +++
 .../controller/manager/event/ControllerResult.java |  68 ++++
 .../controller/manager/event/ElectMasterEvent.java |  75 ++++
 .../controller/manager/event/EventMessage.java     |  28 ++
 .../controller/manager/event/EventSerializer.java  |  77 ++++
 .../controller/manager/event/EventType.java        |  57 +++
 .../processor/ControllerRequestProcessor.java      | 109 ++++++
 .../namesrv/routeinfo/RouteInfoManager.java        |  51 ++-
 .../controller/impl/DledgerControllerTest.java     | 259 +++++++++++++
 .../manager/ReplicasInfoManagerTest.java           | 137 +++++++
 35 files changed, 2862 insertions(+), 17 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index a77d5c2d2..fb3a9b85d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.constant;
 public class LoggerName {
     public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
     public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
+    public static final String CONTROLLER_LOGGER_NAME = "RocketmqController";
     public static final String NAMESRV_WATER_MARK_LOGGER_NAME = "RocketmqNamesrvWaterMark";
     public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
     public static final String BROKER_CONSOLE_NAME = "RocketmqConsole";
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
new file mode 100644
index 000000000..4d4525aef
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.namesrv;
+
+import java.io.File;
+
+public class ControllerConfig {
+
+    /**
+     * Is startup the controller in this name-srv
+     */
+    private boolean isStartupController = false;
+
+    /**
+     * Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
+     */
+    private int controllerThreadPoolNums = 16;
+
+    /**
+     * Indicates the capacity of queue to hold client requests.
+     */
+    private int controllerRequestThreadPoolQueueCapacity = 50000;
+
+    private String controllerDLegerGroup;
+    private String controllerDLegerPeers;
+    private String controllerDLegerSelfId;
+    private int mappedFileSize = 1024 * 1024 * 1024;
+    private String controllerStorePath = System.getProperty("user.home") + File.separator + "DledgerController";
+
+    /**
+     * Whether the controller can elect a master which is not in the syncStateSet.
+     */
+    private boolean enableElectUncleanMaster = false;
+
+    /**
+     * Whether process read event
+     */
+    private boolean isProcessReadEvent = false;
+
+    public boolean isStartupController() {
+        return isStartupController;
+    }
+
+    public void setStartupController(boolean startupController) {
+        isStartupController = startupController;
+    }
+
+    public int getControllerThreadPoolNums() {
+        return controllerThreadPoolNums;
+    }
+
+    public void setControllerThreadPoolNums(int controllerThreadPoolNums) {
+        this.controllerThreadPoolNums = controllerThreadPoolNums;
+    }
+
+    public int getControllerRequestThreadPoolQueueCapacity() {
+        return controllerRequestThreadPoolQueueCapacity;
+    }
+
+    public void setControllerRequestThreadPoolQueueCapacity(int controllerRequestThreadPoolQueueCapacity) {
+        this.controllerRequestThreadPoolQueueCapacity = controllerRequestThreadPoolQueueCapacity;
+    }
+
+    public String getControllerDLegerGroup() {
+        return controllerDLegerGroup;
+    }
+
+    public void setControllerDLegerGroup(String controllerDLegerGroup) {
+        this.controllerDLegerGroup = controllerDLegerGroup;
+    }
+
+    public String getControllerDLegerPeers() {
+        return controllerDLegerPeers;
+    }
+
+    public void setControllerDLegerPeers(String controllerDLegerPeers) {
+        this.controllerDLegerPeers = controllerDLegerPeers;
+    }
+
+    public String getControllerDLegerSelfId() {
+        return controllerDLegerSelfId;
+    }
+
+    public void setControllerDLegerSelfId(String controllerDLegerSelfId) {
+        this.controllerDLegerSelfId = controllerDLegerSelfId;
+    }
+
+    public int getMappedFileSize() {
+        return mappedFileSize;
+    }
+
+    public void setMappedFileSize(int mappedFileSize) {
+        this.mappedFileSize = mappedFileSize;
+    }
+
+    public String getControllerStorePath() {
+        return controllerStorePath;
+    }
+
+    public void setControllerStorePath(String controllerStorePath) {
+        this.controllerStorePath = controllerStorePath;
+    }
+
+    public boolean isEnableElectUncleanMaster() {
+        return enableElectUncleanMaster;
+    }
+
+    public void setEnableElectUncleanMaster(boolean enableElectUncleanMaster) {
+        this.enableElectUncleanMaster = enableElectUncleanMaster;
+    }
+
+    public boolean isProcessReadEvent() {
+        return isProcessReadEvent;
+    }
+
+    public void setProcessReadEvent(boolean processReadEvent) {
+        isProcessReadEvent = processReadEvent;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 541227f4a..c6048ecbf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -236,4 +236,17 @@ public class RequestCode {
     public static final int GET_BROKER_HA_STATUS = 907;
 
     public static final int RESET_MASTER_FLUSH_OFFSET = 908;
+
+    /**
+     * Controller code
+     */
+    public static final int CONTROLLER_ALTER_SYNC_STATE_SET = 1001;
+
+    public static final int CONTROLLER_ELECT_MASTER = 1002;
+
+    public static final int CONTROLLER_REGISTER_BROKER = 1003;
+
+    public static final int CONTROLLER_GET_REPLICA_INFO = 1004;
+
+    public static final int CONTROLLER_GET_METADATA_INFO = 1005;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index db2a275e3..a947854c4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -97,5 +97,16 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
     public static final int RPC_TIME_OUT = -1006;
 
+    /**
+     * Controller response code
+     */
+    public static final int CONTROLLER_FENCED_MASTER_EPOCH = 2000;
+    public static final int CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH = 2001;
+    public static final int CONTROLLER_INVALID_MASTER = 2002;
+    public static final int CONTROLLER_INVALID_REPLICAS = 2003;
+    public static final int CONTROLLER_MASTER_NOT_AVAILABLE = 2004;
+    public static final int CONTROLLER_INVALID_REQUEST = 2005;
+    public static final int CONTROLLER_BROKER_NOT_ALIVE = 2006;
+    public static final int CONTROLLER_NOT_LEADER = 2007;
 
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java
new file mode 100644
index 000000000..3dc396e96
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetRequestHeader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AlterSyncStateSetRequestHeader implements CommandCustomHeader {
+    private final String brokerName;
+    private final String masterAddress;
+    private final int masterEpoch;
+    private final Set<String> newSyncStateSet;
+    private final int syncStateSetEpoch;
+
+    public AlterSyncStateSetRequestHeader(String brokerName, String masterAddress, int masterEpoch,
+        Set<String> newSyncStateSet, int syncStateSetEpoch) {
+        this.brokerName = brokerName;
+        this.masterAddress = masterAddress;
+        this.masterEpoch = masterEpoch;
+        this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public Set<String> getNewSyncStateSet() {
+        return new HashSet<>(newSyncStateSet);
+    }
+
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
+    @Override
+    public String toString() {
+        return "AlterSyncStateSetRequestHeader{" +
+            "brokerName='" + brokerName + '\'' +
+            ", masterAddress='" + masterAddress + '\'' +
+            ", masterEpoch=" + masterEpoch +
+            ", newSyncStateSet=" + newSyncStateSet +
+            ", syncStateSetEpoch=" + syncStateSetEpoch +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java
new file mode 100644
index 000000000..8ab55552a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/AlterSyncStateSetResponseHeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AlterSyncStateSetResponseHeader implements CommandCustomHeader {
+    private Set<String> newSyncStateSet;
+    private int newSyncStateSetEpoch;
+
+    public AlterSyncStateSetResponseHeader() {
+    }
+
+    public Set<String> getNewSyncStateSet() {
+        return new HashSet<>(newSyncStateSet);
+    }
+
+    public void setNewSyncStateSet(Set<String> newSyncStateSet) {
+        this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+    }
+
+    public int getNewSyncStateSetEpoch() {
+        return newSyncStateSetEpoch;
+    }
+
+    public void setNewSyncStateSetEpoch(int newSyncStateSetEpoch) {
+        this.newSyncStateSetEpoch = newSyncStateSetEpoch;
+    }
+
+    @Override
+    public String toString() {
+        return "AlterSyncStateSetResponseHeader{" +
+            "newSyncStateSet=" + newSyncStateSet +
+            ", newSyncStateSetEpoch=" + newSyncStateSetEpoch +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
new file mode 100644
index 000000000..430353bd2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterRequestHeader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ElectMasterRequestHeader implements CommandCustomHeader {
+    private final String brokerName;
+
+    public ElectMasterRequestHeader(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public String toString() {
+        return "ElectMasterRequestHeader{" +
+            "brokerName='" + brokerName + '\'' +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
new file mode 100644
index 000000000..1adcfe3b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/ElectMasterResponseHeader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ElectMasterResponseHeader implements CommandCustomHeader {
+    private String newMasterAddress;
+    private int masterEpoch;
+
+    public ElectMasterResponseHeader() {
+    }
+
+    public String getNewMasterAddress() {
+        return newMasterAddress;
+    }
+
+    public void setNewMasterAddress(String newMasterAddress) {
+        this.newMasterAddress = newMasterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public void setMasterEpoch(int masterEpoch) {
+        this.masterEpoch = masterEpoch;
+    }
+
+    @Override
+    public String toString() {
+        return "ElectMasterResponseHeader{" +
+            "newMasterAddress='" + newMasterAddress + '\'' +
+            ", masterEpoch=" + masterEpoch +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
new file mode 100644
index 000000000..5c8290fa3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetMetaDataResponseHeader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetMetaDataResponseHeader implements CommandCustomHeader {
+    private final String controllerLeaderId;
+    private final String controllerLeaderAddress;
+
+    public GetMetaDataResponseHeader(String controllerLeaderId, String controllerLeaderAddress) {
+        this.controllerLeaderId = controllerLeaderId;
+        this.controllerLeaderAddress = controllerLeaderAddress;
+    }
+
+    public String getControllerLeaderId() {
+        return controllerLeaderId;
+    }
+
+    public String getControllerLeaderAddress() {
+        return controllerLeaderAddress;
+    }
+
+    @Override public String toString() {
+        return "GetMetaDataResponseHeader{" +
+            "controllerLeaderId='" + controllerLeaderId + '\'' +
+            ", controllerLeaderAddress='" + controllerLeaderAddress + '\'' +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
new file mode 100644
index 000000000..e90086a76
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoRequestHeader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetReplicaInfoRequestHeader implements CommandCustomHeader {
+    private final String brokerName;
+
+    public GetReplicaInfoRequestHeader(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    @Override
+    public String toString() {
+        return "GetReplicaInfoRequestHeader{" +
+            "brokerName='" + brokerName + '\'' +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
new file mode 100644
index 000000000..e135d0f0f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/GetReplicaInfoResponseHeader.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import java.util.Set;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetReplicaInfoResponseHeader implements CommandCustomHeader {
+    private String masterAddress;
+    private int masterEpoch;
+    private Set<String> syncStateSet;
+    private int syncStateSetEpoch;
+
+    public GetReplicaInfoResponseHeader() {
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public void setMasterAddress(String masterAddress) {
+        this.masterAddress = masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public void setMasterEpoch(int masterEpoch) {
+        this.masterEpoch = masterEpoch;
+    }
+
+    public Set<String> getSyncStateSet() {
+        return syncStateSet;
+    }
+
+    public void setSyncStateSet(Set<String> syncStateSet) {
+        this.syncStateSet = syncStateSet;
+    }
+
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
+    public void setSyncStateSetEpoch(int syncStateSetEpoch) {
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
+    @Override
+    public String toString() {
+        return "GetReplicaInfoResponseHeader{" +
+            "masterAddress='" + masterAddress + '\'' +
+            ", masterEpoch=" + masterEpoch +
+            ", syncStateSet=" + syncStateSet +
+            ", syncStateSetEpoch=" + syncStateSetEpoch +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java
new file mode 100644
index 000000000..8c9439932
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerRequestHeader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterBrokerRequestHeader implements CommandCustomHeader {
+    private final String clusterName;
+    private final String brokerName;
+    private final String brokerAddress;
+
+    public RegisterBrokerRequestHeader(String clusterName, String brokerName, String brokerAddress) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    @Override
+    public String toString() {
+        return "RegisterBrokerRequestHeader{" +
+            "clusterName='" + clusterName + '\'' +
+            ", brokerName='" + brokerName + '\'' +
+            ", brokerAddress='" + brokerAddress + '\'' +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java
new file mode 100644
index 000000000..97fbb1057
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/RegisterBrokerResponseHeader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv.controller;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterBrokerResponseHeader implements CommandCustomHeader {
+    private String masterAddress;
+    private int masterEpoch;
+    // The id of this registered replicas.
+    private long brokerId;
+
+    public RegisterBrokerResponseHeader() {
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public void setMasterAddress(String masterAddress) {
+        this.masterAddress = masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+
+    public void setMasterEpoch(int masterEpoch) {
+        this.masterEpoch = masterEpoch;
+    }
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    @Override
+    public String toString() {
+        return "RegisterBrokerResponseHeader{" +
+            "masterAddress='" + masterAddress + '\'' +
+            ", masterEpoch=" + masterEpoch +
+            ", brokerId=" + brokerId +
+            '}';
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java
new file mode 100644
index 000000000..805558d1d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/FastJsonSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.support.config.FastJsonConfig;
+import org.apache.commons.lang3.SerializationException;
+
+/**
+ * The object serializer based on fastJson
+ */
+public class FastJsonSerializer implements Serializer {
+    private FastJsonConfig fastJsonConfig = new FastJsonConfig();
+
+    public FastJsonConfig getFastJsonConfig() {
+        return this.fastJsonConfig;
+    }
+
+    public void setFastJsonConfig(FastJsonConfig fastJsonConfig) {
+        this.fastJsonConfig = fastJsonConfig;
+    }
+
+    @Override
+    public <T> byte[] serialize(T t) throws SerializationException {
+        if (t == null) {
+            return new byte[0];
+        } else {
+            try {
+                return JSON.toJSONBytesWithFastJsonConfig(this.fastJsonConfig.getCharset(), t, this.fastJsonConfig.getSerializeConfig(), this.fastJsonConfig.getSerializeFilters(), this.fastJsonConfig.getDateFormat(), JSON.DEFAULT_GENERATE_FEATURE, this.fastJsonConfig.getSerializerFeatures());
+            } catch (Exception var3) {
+                throw new SerializationException("Could not serialize: " + var3.getMessage(), var3);
+            }
+        }
+    }
+
+    @Override
+    public <T> T deserialize(byte[] bytes, Class<T> type) throws SerializationException {
+        if (bytes != null && bytes.length != 0) {
+            try {
+                return JSON.parseObject(bytes, this.fastJsonConfig.getCharset(), type, this.fastJsonConfig.getParserConfig(), this.fastJsonConfig.getParseProcess(), JSON.DEFAULT_PARSER_FEATURE, this.fastJsonConfig.getFeatures());
+            } catch (Exception var3) {
+                throw new SerializationException("Could not deserialize: " + var3.getMessage(), var3);
+            }
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java b/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java
new file mode 100644
index 000000000..a98d2454d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/Serializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import org.apache.commons.lang3.SerializationException;
+
+/**
+ * Serializer
+ */
+public interface Serializer {
+
+    /**
+     * Serialize object t to byte[]
+     */
+    <T> byte[] serialize(T t) throws SerializationException;
+
+    /**
+     * De-serialize bytes to T
+     */
+    <T> T deserialize(byte[] bytes, Class<T> type) throws SerializationException;
+}
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 0b04d5014..83b11d1c5 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -28,6 +28,11 @@
     <name>rocketmq-namesrv ${project.version}</name>
 
     <dependencies>
+        <dependency>
+            <groupId>io.openmessaging.storage</groupId>
+            <artifactId>dledger</artifactId>
+            <version>0.2.5</version>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-client</artifactId>
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index d02e6c0ad..e10878e6b 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -30,13 +30,17 @@ import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.impl.DledgerController;
 import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
 import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
 import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
+import org.apache.rocketmq.namesrv.processor.ControllerRequestProcessor;
 import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
 import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
 import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
@@ -52,7 +56,6 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.srvutil.FileWatchService;
 
-
 public class NamesrvController {
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
     private static final InternalLogger WATER_MARK_LOG = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_WATER_MARK_LOGGER_NAME);
@@ -61,6 +64,7 @@ public class NamesrvController {
 
     private final NettyServerConfig nettyServerConfig;
     private final NettyClientConfig nettyClientConfig;
+    private final ControllerConfig controllerConfig;
 
     private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
         new BasicThreadFactory.Builder()
@@ -82,24 +86,33 @@ public class NamesrvController {
 
     private ExecutorService defaultExecutor;
     private ExecutorService clientRequestExecutor;
+    private ExecutorService controllerRequestExecutor;
 
     private BlockingQueue<Runnable> defaultThreadPoolQueue;
     private BlockingQueue<Runnable> clientRequestThreadPoolQueue;
+    private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
     private Configuration configuration;
     private FileWatchService fileWatchService;
 
+    private Controller controller;
+
     public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
-        this(namesrvConfig, nettyServerConfig, new NettyClientConfig());
+        this(namesrvConfig, nettyServerConfig, new NettyClientConfig(), new ControllerConfig());
     }
 
-    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
+    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig,
+        NettyClientConfig nettyClientConfig, ControllerConfig controllerConfig) {
         this.namesrvConfig = namesrvConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.nettyClientConfig = nettyClientConfig;
+        this.controllerConfig = controllerConfig;
         this.kvConfigManager = new KVConfigManager(this);
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
-        this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
+        if (controllerConfig.isStartupController()) {
+            this.controller = new DledgerController(controllerConfig, this);
+        }
+        this.routeInfoManager = new RouteInfoManager(namesrvConfig, this, this.controller);
         this.configuration = new Configuration(
             LOGGER,
             this.namesrvConfig, this.nettyServerConfig
@@ -140,6 +153,24 @@ public class NamesrvController {
                 return new FutureTaskExt<T>(runnable, value);
             }
         };
+
+        if (this.controllerConfig.isStartupController()) {
+            this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
+            this.controllerRequestExecutor = new ThreadPoolExecutor(
+                this.controllerConfig.getControllerThreadPoolNums(),
+                this.controllerConfig.getControllerThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.controllerRequestThreadPoolQueue,
+                new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+                @Override
+                protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+                    return new FutureTaskExt<T>(runnable, value);
+                }
+            };
+            this.controller.startup();
+        }
+
         this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
         this.remotingClient.updateNameServerAddressList(Arrays.asList(RemotingUtil.getLocalAddress() + ":" + this.nettyServerConfig.getListenPort()));
 
@@ -229,7 +260,6 @@ public class NamesrvController {
 
     private void registerProcessor() {
         if (namesrvConfig.isClusterTest()) {
-
             this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                 this.defaultExecutor);
         } else {
@@ -238,6 +268,15 @@ public class NamesrvController {
             this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);
 
             this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
+
+            if (controllerConfig.isStartupController()) {
+                final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this.controller);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+            }
         }
     }
 
@@ -274,6 +313,10 @@ public class NamesrvController {
         return nettyServerConfig;
     }
 
+    public ControllerConfig getControllerConfig() {
+        return controllerConfig;
+    }
+
     public KVConfigManager getKvConfigManager() {
         return kvConfigManager;
     }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 715f23872..c580a5718 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
@@ -84,6 +85,7 @@ public class NamesrvStartup {
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();
         final NettyClientConfig nettyClientConfig = new NettyClientConfig();
         nettyServerConfig.setListenPort(9876);
+        final ControllerConfig controllerConfig = new ControllerConfig();
         if (commandLine.hasOption('c')) {
             String file = commandLine.getOptionValue('c');
             if (file != null) {
@@ -93,6 +95,7 @@ public class NamesrvStartup {
                 MixAll.properties2Object(properties, namesrvConfig);
                 MixAll.properties2Object(properties, nettyServerConfig);
                 MixAll.properties2Object(properties, nettyClientConfig);
+                MixAll.properties2Object(properties, controllerConfig);
 
                 namesrvConfig.setConfigStorePath(file);
 
@@ -105,6 +108,7 @@ public class NamesrvStartup {
             MixAll.printObjectProperties(null, namesrvConfig);
             MixAll.printObjectProperties(null, nettyServerConfig);
             MixAll.printObjectProperties(null, nettyClientConfig);
+            MixAll.printObjectProperties(null, controllerConfig);
             System.exit(0);
         }
 
@@ -126,7 +130,7 @@ public class NamesrvStartup {
         MixAll.printObjectProperties(log, namesrvConfig);
         MixAll.printObjectProperties(log, nettyServerConfig);
 
-        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);
+        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig, controllerConfig);
 
         // remember all configs to prevent discard
         controller.getConfiguration().registerConfig(properties);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
new file mode 100644
index 000000000..4266942b2
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * The api for controller
+ */
+public interface Controller {
+
+    /**
+     * Startup controller
+     */
+    void startup();
+
+    /**
+     * Shutdown controller
+     */
+    void shutdown();
+
+    /**
+     * Start scheduling controller events, this function only will be triggered when the controller becomes leader.
+     */
+    void startScheduling();
+
+    /**
+     * Stop scheduling controller events, this function only will be triggered when the controller shutdown leaderShip.
+     */
+    void stopScheduling();
+
+    /**
+     * Alter ISR of broker replicas.
+     *
+     * @param request AlterSyncStateSetRequestHeader
+     * @return RemotingCommand(AlterSyncStateSetResponseHeader)
+     */
+    CompletableFuture<RemotingCommand> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request);
+
+    /**
+     * Elect new master for a broker.
+     *
+     * @param request ElectMasterRequest
+     * @return RemotingCommand(ElectMasterResponseHeader)
+     */
+    CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request);
+
+    /**
+     * Register api when a replicas of a broker startup.
+     *
+     * @param request RegisterBrokerRequest
+     * @return RemotingCommand(RegisterBrokerResponseHeader)
+     */
+    CompletableFuture<RemotingCommand> registerBroker(final RegisterBrokerRequestHeader request);
+
+    /**
+     * Get the Replica Info for a target broker.
+     *
+     * @param request GetRouteInfoRequest
+     * @return RemotingCommand(GetReplicaInfoResponseHeader)
+     */
+    CompletableFuture<RemotingCommand> getReplicaInfo(final GetReplicaInfoRequestHeader request);
+
+    /**
+     * Get Metadata of controller
+     *
+     * @return RemotingCommand(GetControllerMetadataResponseHeader)
+     */
+    RemotingCommand getControllerMetadata();
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
new file mode 100644
index 000000000..533b8fbcd
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerLeaderElector;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.MemberState;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.function.Supplier;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * The implementation of controller, based on dledger (raft).
+ */
+public class DledgerController implements Controller {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final DLedgerServer dLedgerServer;
+    private final ControllerConfig controllerConfig;
+    private final DLedgerConfig dLedgerConfig;
+    private final NamesrvController namesrvController;
+    // Usr for checking whether the broker is alive
+    private final BiPredicate<String, String> brokerAlivePredicate;
+
+    private final ReplicasInfoManager replicasInfoManager;
+    private final EventScheduler scheduler;
+    private final EventSerializer eventSerializer;
+    private final RoleChangeHandler roleHandler;
+    private final DledgerControllerStateMachine statemachine;
+    private volatile boolean isScheduling = false;
+
+    public DledgerController(final ControllerConfig config, final NamesrvController namesrvController) {
+        this.controllerConfig = config;
+        this.eventSerializer = new EventSerializer();
+        this.scheduler = new EventScheduler();
+        this.namesrvController = namesrvController;
+        if (namesrvController == null) {
+            this.brokerAlivePredicate = (cluster, address) -> true;
+        } else {
+            this.brokerAlivePredicate = (cluster, address) -> namesrvController.getRouteInfoManager().isBrokerAlive(cluster, address);
+        }
+
+        this.dLedgerConfig = new DLedgerConfig();
+        this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
+        this.dLedgerConfig.setPeers(config.getControllerDLegerPeers());
+        this.dLedgerConfig.setSelfId(config.getControllerDLegerSelfId());
+        this.dLedgerConfig.setStoreBaseDir(config.getControllerStorePath());
+        this.dLedgerConfig.setMappedFileSizeForEntryData(config.getMappedFileSize());
+
+        this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
+        this.replicasInfoManager = new ReplicasInfoManager(config);
+        this.statemachine = new DledgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
+
+        // Register statemachine and role handler.
+        this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+        this.dLedgerServer.registerStateMachine(this.statemachine);
+        this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
+    }
+
+    @Override
+    public void startup() {
+        this.dLedgerServer.startup();
+    }
+
+    @Override
+    public void shutdown() {
+        this.dLedgerServer.shutdown();
+    }
+
+    @Override
+    public void startScheduling() {
+        if (!this.isScheduling) {
+            log.info("Start scheduling controller events");
+            this.isScheduling = true;
+            this.scheduler.start();
+        }
+    }
+
+    @Override
+    public void stopScheduling() {
+        if (this.isScheduling) {
+            log.info("Stop scheduling controller events");
+            this.isScheduling = false;
+            this.scheduler.shutdown(true);
+        }
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> alterSyncStateSet(AlterSyncStateSetRequestHeader request) {
+        return this.scheduler.appendEvent("alterSyncStateSet",
+            () -> this.replicasInfoManager.alterSyncStateSet(request, this.brokerAlivePredicate), true);
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request) {
+        return this.scheduler.appendEvent("electMaster",
+            () -> this.replicasInfoManager.electMaster(request, this.brokerAlivePredicate), true);
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerRequestHeader request) {
+        return this.scheduler.appendEvent("registerBroker",
+            () -> this.replicasInfoManager.registerBroker(request), true);
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+        return this.scheduler.appendEvent("getReplicaInfo",
+            () -> this.replicasInfoManager.getReplicaInfo(request), false);
+    }
+
+    @Override
+    public RemotingCommand getControllerMetadata() {
+        final MemberState state = getMemberState();
+        return RemotingCommand.createResponseCommandWithHeader(ResponseCode.SUCCESS, new GetMetaDataResponseHeader(state.getLeaderId(), state.getLeaderAddr()));
+    }
+
+    /**
+     * Event handler that handle event
+     */
+    interface EventHandler<T> {
+        /**
+         * Run the controller event
+         */
+        void run() throws Throwable;
+
+        /**
+         * Return the completableFuture
+         */
+        CompletableFuture<RemotingCommand> future();
+
+        /**
+         * Handle Exception.
+         */
+        void handleException(final Throwable t);
+    }
+
+    /**
+     * Event scheduler, schedule event handler from event queue
+     */
+    class EventScheduler extends ServiceThread {
+        private final BlockingQueue<EventHandler> eventQueue;
+
+        public EventScheduler() {
+            this.eventQueue = new LinkedBlockingQueue<>(1024);
+        }
+
+        @Override
+        public String getServiceName() {
+            return EventScheduler.class.getName();
+        }
+
+        @Override
+        public void run() {
+            log.info("Start event scheduler.");
+            while (!isStopped()) {
+                EventHandler handler;
+                try {
+                    handler = this.eventQueue.poll(5, TimeUnit.SECONDS);
+                } catch (final InterruptedException e) {
+                    continue;
+                }
+                try {
+                    if (handler != null) {
+                        handler.run();
+                    }
+                } catch (final Throwable e) {
+                    handler.handleException(e);
+                }
+            }
+        }
+
+        public <T> CompletableFuture<RemotingCommand> appendEvent(final String name,
+            final Supplier<ControllerResult<T>> supplier, boolean isWriteEvent) {
+            if (isStopped() || !DledgerController.this.roleHandler.isLeaderState()) {
+                final RemotingCommand command = RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_NOT_LEADER, "The controller is not in leader state");
+                final CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+                future.complete(command);
+                return future;
+            }
+
+            final EventHandler<T> event = new ControllerEventHandler<>(name, supplier, isWriteEvent);
+            int tryTimes = 0;
+            while (true) {
+                try {
+                    if (!this.eventQueue.offer(event, 5, TimeUnit.SECONDS)) {
+                        continue;
+                    }
+                    return event.future();
+                } catch (final InterruptedException e) {
+                    log.error("Error happen in EventScheduler when append event", e);
+                    tryTimes++;
+                    if (tryTimes > 3) {
+                        return null;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Event handler, get events from supplier, and append events to dledger
+     */
+    class ControllerEventHandler<T> implements EventHandler<T> {
+        private final String name;
+        private final Supplier<ControllerResult<T>> supplier;
+        private final CompletableFuture<RemotingCommand> future;
+        private final boolean isWriteEvent;
+
+        ControllerEventHandler(final String name, final Supplier<ControllerResult<T>> supplier,
+            final boolean isWriteEvent) {
+            this.name = name;
+            this.supplier = supplier;
+            this.future = new CompletableFuture<>();
+            this.isWriteEvent = isWriteEvent;
+        }
+
+        @Override
+        public void run() throws Throwable {
+            final ControllerResult<T> result = this.supplier.get();
+            log.info("Event queue run event {}, get the result {}", this.name, result);
+            boolean appendSuccess = true;
+            if (this.isWriteEvent) {
+                final List<EventMessage> events = result.getEvents();
+                final List<byte[]> eventBytes = new ArrayList<>(events.size());
+                for (final EventMessage event : events) {
+                    if (event != null) {
+                        final byte[] data = DledgerController.this.eventSerializer.serialize(event);
+                        if (data != null && data.length > 0) {
+                            eventBytes.add(data);
+                        }
+                    }
+                }
+                // Append events to dledger
+                if (!eventBytes.isEmpty()) {
+                    final BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+                    request.setBatchMsgs(eventBytes);
+                    appendSuccess = appendToDledgerAndWait(request);
+                }
+            } else {
+                if (DledgerController.this.controllerConfig.isProcessReadEvent()) {
+                    // Now the dledger don't have the function of Read-Index or Lease-Read,
+                    // So we still need to propose an empty request to dledger.
+                    final AppendEntryRequest request = new AppendEntryRequest();
+                    request.setBody(new byte[0]);
+                    appendSuccess = appendToDledgerAndWait(request);
+                }
+            }
+            if (appendSuccess) {
+                final RemotingCommand response = RemotingCommand.createResponseCommandWithHeader(result.getResponseCode(), (CommandCustomHeader) result.getResponse());
+                this.future.complete(response);
+            } else {
+                log.error("Failed to append event to dledger, the response is {}, try cancel the future", result.getResponse());
+                this.future.cancel(true);
+            }
+        }
+
+        @Override
+        public CompletableFuture<RemotingCommand> future() {
+            return this.future;
+        }
+
+        @Override
+        public void handleException(final Throwable t) {
+            log.error("Error happen when handle event {}", this.name, t);
+            this.future.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * Role change handler, trigger the startScheduling() and stopScheduling() when role change.
+     */
+    class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
+
+        private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
+        private final String selfId;
+        private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
+
+        public RoleChangeHandler(final String selfId) {
+            this.selfId = selfId;
+        }
+
+        @Override
+        public void handle(long term, MemberState.Role role) {
+            Runnable runnable = () -> {
+                switch (role) {
+                    case CANDIDATE:
+                        this.currentRole = MemberState.Role.CANDIDATE;
+                        log.info("Controller {} change role to candidate", this.selfId);
+                        DledgerController.this.stopScheduling();
+                        break;
+                    case FOLLOWER:
+                        this.currentRole = MemberState.Role.FOLLOWER;
+                        log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
+                        DledgerController.this.stopScheduling();
+                        break;
+                    case LEADER: {
+                        this.currentRole = MemberState.Role.LEADER;
+                        log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
+                        // Because the role becomes to leader, but the memory statemachine of the controller is still in the old point,
+                        // some committed logs have not been applied. Therefore, we must first process an empty request to dledger,
+                        // and after the request is committed, the controller can provide services(startScheduling).
+                        int tryTimes = 0;
+                        while (true) {
+                            final AppendEntryRequest request = new AppendEntryRequest();
+                            request.setBody(new byte[0]);
+                            try {
+                                if (appendToDledgerAndWait(request)) {
+                                    DledgerController.this.startScheduling();
+                                    break;
+                                }
+                            } catch (final Throwable e) {
+                                log.error("Error happen when controller leader append initial request to dledger", e);
+                                tryTimes++;
+                                if (tryTimes > 2) {
+                                    log.warn("Controller leader append initial log failed too many times, please wait a while");
+                                    tryTimes = 0;
+                                }
+                            }
+                        }
+                        break;
+                    }
+                }
+            };
+            this.executorService.submit(runnable);
+        }
+
+        @Override
+        public void startup() {
+        }
+
+        @Override
+        public void shutdown() {
+            if (this.currentRole == MemberState.Role.LEADER) {
+                DledgerController.this.stopScheduling();
+            }
+            this.executorService.shutdown();
+        }
+
+        public boolean isLeaderState() {
+            return this.currentRole == MemberState.Role.LEADER;
+        }
+    }
+
+    /**
+     * Append the request to dledger, wait the dledger to commit the request.
+     */
+    private boolean appendToDledgerAndWait(final AppendEntryRequest request) throws Throwable {
+        if (request != null) {
+            request.setGroup(this.dLedgerConfig.getGroup());
+            request.setRemoteId(this.dLedgerConfig.getSelfId());
+
+            final AppendFuture<AppendEntryResponse> dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
+            if (dledgerFuture.getPos() == -1) {
+                return false;
+            }
+            dledgerFuture.get(5, TimeUnit.SECONDS);
+            return true;
+        }
+        return false;
+    }
+
+    // Only for test
+    public MemberState getMemberState() {
+        return this.dLedgerServer.getMemberState();
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
new file mode 100644
index 000000000..2eed8abfb
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
+import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
+import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
+import io.openmessaging.storage.dledger.statemachine.StateMachine;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
+
+/**
+ * The state machine implementation of the dledger controller
+ */
+public class DledgerControllerStateMachine implements StateMachine {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final ReplicasInfoManager replicasInfoManager;
+    private final EventSerializer eventSerializer;
+    private final String dledgerId;
+
+    public DledgerControllerStateMachine(final ReplicasInfoManager replicasInfoManager,
+        final EventSerializer eventSerializer, final String dledgerId) {
+        this.replicasInfoManager = replicasInfoManager;
+        this.eventSerializer = eventSerializer;
+        this.dledgerId = dledgerId;
+    }
+
+    @Override
+    public void onApply(CommittedEntryIterator iterator) {
+        int applyingSize = 0;
+        while (iterator.hasNext()) {
+            final DLedgerEntry entry = iterator.next();
+            final byte[] body = entry.getBody();
+            if (body != null && body.length > 0) {
+                final EventMessage event = this.eventSerializer.deserialize(body);
+                this.replicasInfoManager.applyEvent(event);
+                applyingSize ++;
+            }
+        }
+        log.info("Apply {} events on controller {}", applyingSize, this.dledgerId);
+    }
+
+    @Override
+    public void onSnapshotSave(SnapshotWriter writer, CompletableFuture<Boolean> future) {
+    }
+
+    @Override
+    public boolean onSnapshotLoad(SnapshotReader reader) {
+        return false;
+    }
+
+    @Override
+    public void onShutdown() {
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java
new file mode 100644
index 000000000..e060c13ee
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerIdInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Broker id info, mapping from brokerAddress to brokerId.
+ */
+public class BrokerIdInfo {
+    private final String clusterName;
+    private final String brokerName;
+    // Start from 1
+    private final AtomicLong brokerIdCount;
+    private final HashMap<String/*Address*/, Long/*brokerId*/> brokerIdTable;
+
+    public BrokerIdInfo(String clusterName, String brokerName) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.brokerIdCount = new AtomicLong(1L);
+        this.brokerIdTable = new HashMap<>();
+    }
+
+    public long newBrokerId() {
+        return this.brokerIdCount.incrementAndGet();
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public HashMap<String, Long> getBrokerIdTable() {
+        return brokerIdTable;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
new file mode 100644
index 000000000..8b8c5afd4
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * In sync replicas info, manages the master and syncStateSet of a broker.
+ */
+public class InSyncReplicasInfo {
+    private final String clusterName;
+    private final String brokerName;
+
+    private Set<String/*Address*/> syncStateSet;
+    private int syncStateSetEpoch;
+
+    private String masterAddress;
+    private int masterEpoch;
+
+    public InSyncReplicasInfo(String clusterName, String brokerName, String masterAddress) {
+        this.clusterName = clusterName;
+        this.brokerName = brokerName;
+        this.masterAddress = masterAddress;
+        this.masterEpoch = 1;
+        this.syncStateSet = new HashSet<>();
+        this.syncStateSet.add(masterAddress);
+        this.syncStateSetEpoch = 1;
+    }
+
+    public void updateMasterInfo(String masterAddress) {
+        this.masterAddress = masterAddress;
+        this.masterEpoch++;
+    }
+
+    public void updateSyncStateSetInfo(Set<String> newSyncStateSet) {
+        this.syncStateSet = new HashSet<>(newSyncStateSet);
+        this.syncStateSetEpoch++;
+    }
+
+    public boolean isMasterExist() {
+        return !this.masterAddress.isEmpty();
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Set<String> getSyncStateSet() {
+        return new HashSet<>(syncStateSet);
+    }
+
+    public int getSyncStateSetEpoch() {
+        return syncStateSetEpoch;
+    }
+
+    public String getMasterAddress() {
+        return masterAddress;
+    }
+
+    public int getMasterEpoch() {
+        return masterEpoch;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
new file mode 100644
index 000000000..622a765b5
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
+
+/**
+ * The manager that manages the replicas info for all brokers.
+ * We can think of this class as the controller's memory state machine
+ * It should be noted that this class is not thread safe,
+ * and the upper layer needs to ensure that it can be called sequentially
+ */
+public class ReplicasInfoManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private final boolean enableElectUncleanMaster;
+    private final Map<String/* brokerName */, BrokerIdInfo> replicaInfoTable;
+    private final Map<String/* brokerName */, InSyncReplicasInfo> inSyncReplicasInfoTable;
+
+    public ReplicasInfoManager(final ControllerConfig config) {
+        this.enableElectUncleanMaster = config.isEnableElectUncleanMaster();
+        this.replicaInfoTable = new HashMap<>();
+        this.inSyncReplicasInfoTable = new HashMap<>();
+    }
+
+    public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
+        final AlterSyncStateSetRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = new ControllerResult<>(new AlterSyncStateSetResponseHeader());
+        final AlterSyncStateSetResponseHeader response = result.getResponse();
+
+        if (isContainsBroker(brokerName)) {
+            final Set<String> newSyncStateSet = request.getNewSyncStateSet();
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Check master
+            if (!replicasInfo.getMasterAddress().equals(request.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the current leader is:{}, not {}",
+                    replicasInfo.getMasterAddress(), request.getMasterAddress());
+                result.setResponseCode(ResponseCode.CONTROLLER_INVALID_MASTER);
+                return result;
+            }
+
+            // Check master epoch
+            if (request.getMasterEpoch() != replicasInfo.getMasterEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current master epoch is:{}, not {}",
+                    replicasInfo.getMasterEpoch(), request.getMasterEpoch());
+                result.setResponseCode(ResponseCode.CONTROLLER_FENCED_MASTER_EPOCH);
+                return result;
+            }
+
+            // Check syncStateSet epoch
+            if (request.getSyncStateSetEpoch() != replicasInfo.getSyncStateSetEpoch()) {
+                log.info("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{}, not {}",
+                    replicasInfo.getSyncStateSetEpoch(), request.getSyncStateSetEpoch());
+                result.setResponseCode(ResponseCode.CONTROLLER_FENCED_SYNC_STATE_SET_EPOCH);
+                return result;
+            }
+
+            // Check newSyncStateSet correctness
+            for (String replicas : newSyncStateSet) {
+                if (!brokerIdTable.containsKey(replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't exist", replicas);
+                    result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REPLICAS);
+                    return result;
+                }
+                if (!brokerAlivePredicate.test(brokerInfo.getClusterName(), replicas)) {
+                    log.info("Rejecting alter syncStateSet request because the replicas {} don't alive", replicas);
+                    result.setResponseCode(ResponseCode.CONTROLLER_BROKER_NOT_ALIVE);
+                    return result;
+                }
+            }
+
+            if (!newSyncStateSet.contains(replicasInfo.getMasterAddress())) {
+                log.info("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {}", replicasInfo.getMasterAddress());
+                result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+                return result;
+            }
+
+            // Generate event
+            response.setNewSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch() + 1);
+            response.setNewSyncStateSet(newSyncStateSet);
+            final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
+            result.addEvent(event);
+            return result;
+        }
+        result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+        return result;
+    }
+
+    public ControllerResult<ElectMasterResponseHeader> electMaster(
+        final ElectMasterRequestHeader request, final BiPredicate<String, String> brokerAlivePredicate) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<ElectMasterResponseHeader> result = new ControllerResult<>(new ElectMasterResponseHeader());
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final Set<String> syncStateSet = replicasInfo.getSyncStateSet();
+            // Try elect a master in syncStateSet
+            if (syncStateSet.size() > 1) {
+                boolean electSuccess = tryElectMaster(result, brokerName, syncStateSet, (candidate) ->
+                    !candidate.equals(replicasInfo.getMasterAddress()) && brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+                if (electSuccess) {
+                    return result;
+                }
+            }
+
+            // Try elect a master in lagging replicas if enableElectUncleanMaster = true
+            if (enableElectUncleanMaster) {
+                final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+                boolean electSuccess = tryElectMaster(result, brokerName, brokerIdTable.keySet(), (candidate) ->
+                    !candidate.equals(replicasInfo.getMasterAddress()) && brokerAlivePredicate.test(brokerInfo.getClusterName(), candidate));
+                if (electSuccess) {
+                    return result;
+                }
+            }
+
+            // If elect failed, we still need to apply an ElectMasterEvent to tell the statemachine
+            // that the master was shutdown and no new master was elected.
+            final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
+            result.addEvent(event);
+            result.setResponseCode(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE);
+            return result;
+        }
+        result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+        return result;
+    }
+
+    /**
+     * Try elect a new master in candidates
+     *
+     * @param filter return true if the candidate is available
+     * @return true if elect success
+     */
+    private boolean tryElectMaster(final ControllerResult<ElectMasterResponseHeader> result, final String brokerName,
+        final Set<String> candidates, final Predicate<String> filter) {
+        final int masterEpoch = this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch();
+        for (final String candidate : candidates) {
+            if (filter.test(candidate)) {
+                final ElectMasterResponseHeader response = result.getResponse();
+                response.setNewMasterAddress(candidate);
+                response.setMasterEpoch(masterEpoch + 1);
+
+                final ElectMasterEvent event = new ElectMasterEvent(brokerName, candidate);
+                result.addEvent(event);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public ControllerResult<RegisterBrokerResponseHeader> registerBroker(final RegisterBrokerRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final String brokerAddress = request.getBrokerAddress();
+        final ControllerResult<RegisterBrokerResponseHeader> result = new ControllerResult<>(new RegisterBrokerResponseHeader());
+        final RegisterBrokerResponseHeader response = result.getResponse();
+        boolean canBeElectedAsMaster;
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+
+            // Get brokerId.
+            long brokerId;
+            if (!brokerIdTable.containsKey(brokerAddress)) {
+                // If this broker replicas is first time come online, we need to apply a new id for this replicas.
+                brokerId = brokerInfo.newBrokerId();
+                final ApplyBrokerIdEvent applyIdEvent = new ApplyBrokerIdEvent(request.getBrokerName(),
+                    brokerAddress, brokerId);
+                result.addEvent(applyIdEvent);
+            } else {
+                brokerId = brokerIdTable.get(brokerAddress);
+            }
+            response.setBrokerId(brokerId);
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+
+            if (replicasInfo.isMasterExist()) {
+                // If the master is alive, just return master info.
+                response.setMasterAddress(replicasInfo.getMasterAddress());
+                return result;
+            } else {
+                // If the master is not alive, we should elect a new master:
+                // Case1: This replicas was in sync state set list
+                // Case2: The option {EnableElectUncleanMaster} is true
+                canBeElectedAsMaster = replicasInfo.getSyncStateSet().contains(brokerAddress) || this.enableElectUncleanMaster;
+            }
+        } else {
+            // If the broker's metadata does not exist in the state machine, the replicas can be elected as master directly.
+            canBeElectedAsMaster = true;
+        }
+
+        if (canBeElectedAsMaster) {
+            int masterEpoch = this.inSyncReplicasInfoTable.containsKey(brokerName) ?
+                this.inSyncReplicasInfoTable.get(brokerName).getMasterEpoch() + 1 : 1;
+            response.setMasterAddress(request.getBrokerAddress());
+            response.setMasterEpoch(masterEpoch);
+            response.setBrokerId(0);
+
+            final ElectMasterEvent event = new ElectMasterEvent(true, brokerName, brokerAddress, request.getClusterName());
+            result.addEvent(event);
+            return result;
+        }
+
+        response.setMasterAddress("");
+        result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+        return result;
+    }
+
+    public ControllerResult<GetReplicaInfoResponseHeader> getReplicaInfo(final GetReplicaInfoRequestHeader request) {
+        final String brokerName = request.getBrokerName();
+        final ControllerResult<GetReplicaInfoResponseHeader> result = new ControllerResult<>(new GetReplicaInfoResponseHeader());
+        final GetReplicaInfoResponseHeader response = result.getResponse();
+        if (isContainsBroker(brokerName)) {
+            // If exist broker metadata, just return metadata
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            response.setMasterAddress(replicasInfo.getMasterAddress());
+            response.setMasterEpoch(replicasInfo.getMasterEpoch());
+            response.setSyncStateSet(replicasInfo.getSyncStateSet());
+            response.setSyncStateSetEpoch(replicasInfo.getSyncStateSetEpoch());
+            return result;
+        }
+        result.setResponseCode(ResponseCode.CONTROLLER_INVALID_REQUEST);
+        return result;
+    }
+
+    /**
+     * Apply events to memory statemachine.
+     *
+     * @param event event message
+     */
+    public void applyEvent(final EventMessage event) {
+        final EventType type = event.getEventType();
+        switch (type) {
+            case ALTER_SYNC_STATE_SET_EVENT:
+                handleAlterSyncStateSet((AlterSyncStateSetEvent) event);
+                break;
+            case APPLY_BROKER_ID_EVENT:
+                handleApplyBrokerId((ApplyBrokerIdEvent) event);
+                break;
+            case ELECT_MASTER_EVENT:
+                handleElectMaster((ElectMasterEvent) event);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void handleAlterSyncStateSet(final AlterSyncStateSetEvent event) {
+        final String brokerName = event.getBrokerName();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+            replicasInfo.updateSyncStateSetInfo(event.getNewSyncStateSet());
+        }
+    }
+
+    private void handleApplyBrokerId(final ApplyBrokerIdEvent event) {
+        final String brokerName = event.getBrokerName();
+        if (isContainsBroker(brokerName)) {
+            final BrokerIdInfo brokerInfo = this.replicaInfoTable.get(brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+            if (!brokerIdTable.containsKey(event.getBrokerAddress())) {
+                brokerIdTable.put(event.getBrokerAddress(), event.getNewBrokerId());
+            }
+        }
+    }
+
+    private void handleElectMaster(final ElectMasterEvent event) {
+        final String brokerName = event.getBrokerName();
+        final String newMaster = event.getNewMasterAddress();
+        if (isContainsBroker(brokerName)) {
+            final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName);
+
+            if (event.getNewMasterElected()) {
+                // Record new master
+                replicasInfo.updateMasterInfo(newMaster);
+
+                // Record new newSyncStateSet list
+                final HashSet<String> newSyncStateSet = new HashSet<>();
+                newSyncStateSet.add(newMaster);
+                replicasInfo.updateSyncStateSetInfo(newSyncStateSet);
+            } else {
+                // If new master was not elected, which means old master was shutdown and the newSyncStateSet list had no more replicas
+                // So we should delete old master, but retain newSyncStateSet list.
+                replicasInfo.updateMasterInfo("");
+            }
+        } else {
+            // When the first replicas of a broker come online,
+            // we can create memory meta information for the broker, and regard it as master
+            final String clusterName = event.getClusterName();
+            final BrokerIdInfo brokerInfo = new BrokerIdInfo(clusterName, brokerName);
+            final HashMap<String, Long> brokerIdTable = brokerInfo.getBrokerIdTable();
+            final InSyncReplicasInfo replicasInfo = new InSyncReplicasInfo(clusterName, brokerName, newMaster);
+            brokerIdTable.put(newMaster, 1L);
+            this.inSyncReplicasInfoTable.put(brokerName, replicasInfo);
+            this.replicaInfoTable.put(brokerName, brokerInfo);
+        }
+    }
+
+    /**
+     * Is the broker existed in the memory metadata
+     *
+     * @return true if both existed in replicaInfoTable and inSyncReplicasInfoTable
+     */
+    private boolean isContainsBroker(final String brokerName) {
+        return this.replicaInfoTable.containsKey(brokerName) && this.inSyncReplicasInfoTable.containsKey(brokerName);
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
new file mode 100644
index 000000000..4da71e32b
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The event alters the syncStateSet of target broker.
+ * Triggered by the AlterSyncStateSetApi.
+ */
+public class AlterSyncStateSetEvent implements EventMessage {
+
+    private final String brokerName;
+    private final Set<String/*Address*/> newSyncStateSet;
+
+    public AlterSyncStateSetEvent(String brokerName, Set<String> newSyncStateSet) {
+        this.brokerName = brokerName;
+        this.newSyncStateSet = new HashSet<>(newSyncStateSet);
+    }
+
+    @Override
+    public EventType getEventType() {
+        return EventType.ALTER_SYNC_STATE_SET_EVENT;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public Set<String> getNewSyncStateSet() {
+        return new HashSet<>(newSyncStateSet);
+    }
+
+    @Override public String toString() {
+        return "AlterSyncStateSetEvent{" +
+            "brokerName='" + brokerName + '\'' +
+            ", newSyncStateSet=" + newSyncStateSet +
+            '}';
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
new file mode 100644
index 000000000..fc9860dbe
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The event trys to apply a new id for a new broker.
+ * Triggered by the RegisterBrokerApi.
+ */
+public class ApplyBrokerIdEvent implements EventMessage {
+    private final String brokerName;
+    private final String brokerAddress;
+    private final long newBrokerId;
+
+    public ApplyBrokerIdEvent(String brokerName, String brokerAddress, long newBrokerId) {
+        this.brokerName = brokerName;
+        this.brokerAddress = brokerAddress;
+        this.newBrokerId = newBrokerId;
+    }
+
+    @Override
+    public EventType getEventType() {
+        return EventType.APPLY_BROKER_ID_EVENT;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getBrokerAddress() {
+        return brokerAddress;
+    }
+
+    public long getNewBrokerId() {
+        return newBrokerId;
+    }
+
+    @Override public String toString() {
+        return "ApplyBrokerIdEvent{" +
+            "brokerName='" + brokerName + '\'' +
+            ", brokerAddress='" + brokerAddress + '\'' +
+            ", newBrokerId=" + newBrokerId +
+            '}';
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
new file mode 100644
index 000000000..9d5915260
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+public class ControllerResult<T> {
+    private final List<EventMessage> events;
+    private final T response;
+    private int responseCode = ResponseCode.SUCCESS;
+
+    public ControllerResult(T response) {
+        this.events = new ArrayList<>();
+        this.response = response;
+    }
+
+    public ControllerResult(List<EventMessage> events, T response) {
+        this.events = new ArrayList<>(events);
+        this.response = response;
+    }
+
+    public List<EventMessage> getEvents() {
+        return new ArrayList<>(events);
+    }
+
+    public T getResponse() {
+        return response;
+    }
+
+    public void setResponseCode(int responseCode) {
+        this.responseCode = responseCode;
+    }
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+    public static <T> ControllerResult<T> of(List<EventMessage> events, T response) {
+        return new ControllerResult<>(events, response);
+    }
+
+    public void addEvent(EventMessage event) {
+        this.events.add(event);
+    }
+
+    @Override public String toString() {
+        return "ControllerResult{" +
+            "events=" + events +
+            ", response=" + response +
+            '}';
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
new file mode 100644
index 000000000..69d71a47b
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The event trys to elect a new master for target broker.
+ * Triggered by the ElectMasterApi.
+ */
+public class ElectMasterEvent implements EventMessage {
+    // Mark whether a new master was elected.
+    private final boolean newMasterElected;
+    private final String brokerName;
+    private final String newMasterAddress;
+    private final String clusterName;
+
+    public ElectMasterEvent(boolean newMasterElected, String brokerName) {
+        this(newMasterElected, brokerName, "", "");
+    }
+
+    public ElectMasterEvent(String brokerName, String newMasterAddress) {
+        this(true, brokerName, newMasterAddress, "");
+    }
+
+    public ElectMasterEvent(boolean newMasterElected, String brokerName, String newMasterAddress,
+        String clusterName) {
+        this.newMasterElected = newMasterElected;
+        this.brokerName = brokerName;
+        this.newMasterAddress = newMasterAddress;
+        this.clusterName = clusterName;
+    }
+
+    @Override
+    public EventType getEventType() {
+        return EventType.ELECT_MASTER_EVENT;
+    }
+
+    public boolean getNewMasterElected() {
+        return newMasterElected;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public String getNewMasterAddress() {
+        return newMasterAddress;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    @Override public String toString() {
+        return "ElectMasterEvent{" +
+            "isNewMasterElected=" + newMasterElected +
+            ", brokerName='" + brokerName + '\'' +
+            ", newMasterAddress='" + newMasterAddress + '\'' +
+            ", clusterName='" + clusterName + '\'' +
+            '}';
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
new file mode 100644
index 000000000..768fc0dd6
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * The parent class of Event, the subclass needs to indicate eventType.
+ */
+public interface EventMessage {
+
+    /**
+     * Returns the event type of this message
+     */
+    EventType getEventType();
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
new file mode 100644
index 000000000..66b956a6a
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.rocketmq.common.utils.FastJsonSerializer;
+
+/**
+ * EventMessage serializer
+ */
+public class EventSerializer {
+    private final FastJsonSerializer serializer;
+
+    public  EventSerializer() {
+        this.serializer = new FastJsonSerializer();
+    }
+
+    private void putShort(byte[] memory, int index, int value) {
+        memory[index] = (byte) (value >>> 8);
+        memory[index + 1] = (byte) value;
+    }
+
+    private short getShort(byte[] memory, int index) {
+        return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
+    }
+
+    public byte[] serialize(EventMessage message) throws SerializationException {
+        final short eventType = message.getEventType().getId();
+        final byte[] data = this.serializer.serialize(message);
+        if (data != null && data.length > 0) {
+            final byte[] result = new byte[2 + data.length];
+            putShort(result, 0, eventType);
+            System.arraycopy(data, 0, result, 2, data.length);
+            return result;
+        }
+        return null;
+    }
+
+    public EventMessage deserialize(byte[] bytes) throws SerializationException {
+        if (bytes.length < 2) {
+            return null;
+        }
+        final short eventId = getShort(bytes, 0);
+        if (eventId > 0) {
+            final byte[] data = new byte[bytes.length - 2];
+            System.arraycopy(bytes, 2, data, 0, data.length);
+            final EventType eventType = EventType.from(eventId);
+            if (eventType != null) {
+                switch (eventType) {
+                    case ALTER_SYNC_STATE_SET_EVENT:
+                        return this.serializer.deserialize(data, AlterSyncStateSetEvent.class);
+                    case APPLY_BROKER_ID_EVENT:
+                        return this.serializer.deserialize(data, ApplyBrokerIdEvent.class);
+                    case ELECT_MASTER_EVENT:
+                        return this.serializer.deserialize(data, ElectMasterEvent.class);
+                    default:
+                        break;
+                }
+            }
+        }
+        return null;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
new file mode 100644
index 000000000..9496d0a3e
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager.event;
+
+/**
+ * Event type (name, id);
+ */
+public enum EventType {
+    ALTER_SYNC_STATE_SET_EVENT("AlterSyncStateSetEvent", (short) 1),
+    APPLY_BROKER_ID_EVENT("ApplyBrokerIdEvent", (short) 2),
+    ELECT_MASTER_EVENT("ElectMasterEvent", (short) 3),
+    READ_EVENT("ReadEvent", (short) 4);
+
+    private final String name;
+    private final short id;
+
+    EventType(String name, short id) {
+        this.name = name;
+        this.id = id;
+    }
+
+    public static EventType from(short id) {
+        switch (id) {
+            case 1:
+                return ALTER_SYNC_STATE_SET_EVENT;
+            case 2:
+                return APPLY_BROKER_ID_EVENT;
+            case 3:
+                return ELECT_MASTER_EVENT;
+            case 4:
+                return READ_EVENT;
+        }
+        return null;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public short getId() {
+        return id;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
new file mode 100644
index 000000000..d704eb286
--- /dev/null
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO;
+import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER;
+
+/**
+ * Processor for controller request
+ */
+public class ControllerRequestProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final int WAIT_TIMEOUT_OUT = 5;
+    private final Controller controller;
+
+    public ControllerRequestProcessor(final Controller controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        if (ctx != null) {
+            log.debug("Receive request, {} {} {}",
+                request.getCode(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                request);
+        }
+        switch (request.getCode()) {
+            case CONTROLLER_ALTER_SYNC_STATE_SET: {
+                final AlterSyncStateSetRequestHeader controllerRequest = request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = this.controller.alterSyncStateSet(controllerRequest);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+                break;
+            }
+            case CONTROLLER_ELECT_MASTER: {
+                final ElectMasterRequestHeader controllerRequest = request.decodeCommandCustomHeader(ElectMasterRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = this.controller.electMaster(controllerRequest);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+                break;
+            }
+            case CONTROLLER_REGISTER_BROKER: {
+                final RegisterBrokerRequestHeader controllerRequest = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = this.controller.registerBroker(controllerRequest);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+                break;
+            }
+            case CONTROLLER_GET_REPLICA_INFO: {
+                final GetReplicaInfoRequestHeader controllerRequest = request.decodeCommandCustomHeader(GetReplicaInfoRequestHeader.class);
+                final CompletableFuture<RemotingCommand> future = this.controller.getReplicaInfo(controllerRequest);
+                if (future != null) {
+                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                }
+                break;
+            }
+            case CONTROLLER_GET_METADATA_INFO: {
+                return this.controller.getControllerMetadata();
+            }
+            default: {
+                final String error = " request type " + request.getCode() + " not supported";
+                return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+            }
+        }
+        return RemotingCommand.createResponseCommand(null);
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index f64640a26..dd206bbb4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -34,27 +34,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
-import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.namesrv.controller.Controller;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -80,6 +82,12 @@ public class RouteInfoManager {
 
     private final NamesrvController namesrvController;
     private final NamesrvConfig namesrvConfig;
+    private Controller controller;
+
+    public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController, final Controller controller) {
+        this(namesrvConfig, namesrvController);
+        this.controller = controller;
+    }
 
     public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
         this.topicQueueTable = new ConcurrentHashMap<String, Map<String, QueueData>>(1024);
@@ -663,6 +671,15 @@ public class RouteInfoManager {
                     } else {
                         reducedBroker.add(brokerName);
                     }
+
+                    // Check whether we need to elect a new master
+                    if (this.namesrvController != null && this.namesrvController.getControllerConfig().isStartupController() && this.controller != null) {
+                        if (unRegisterRequest.getBrokerId() == 0) {
+                            this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
+                            // Todo: Inform the master
+                            // However, because now the broker does not have the related api, so I will complete the process in the future.
+                        }
+                    }
                 }
 
                 Set<String> changedTopics = cleanTopicByUnRegisterRequests(removedBroker, reducedBroker);
@@ -675,6 +692,7 @@ public class RouteInfoManager {
                 this.lock.writeLock().unlock();
             }
         } catch (Exception e) {
+            e.printStackTrace();
             log.error("unregisterBroker Exception", e);
         }
     }
@@ -1144,6 +1162,19 @@ public class RouteInfoManager {
 
         return topicList;
     }
+
+    /**
+     * @return true if the broker{brokerAddress} is alive
+     */
+    public boolean isBrokerAlive(final String clusterName, final String brokerAddress) {
+        final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddress));
+        if (info != null) {
+            long last = info.getLastUpdateTimestamp();
+            long timeoutMillis = info.getHeartbeatTimeoutMillis();
+            return (last + timeoutMillis) >= System.currentTimeMillis();
+        }
+        return false;
+    }
 }
 
 /**
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
new file mode 100644
index 000000000..627e982b6
--- /dev/null
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.impl;
+
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author hzh
+ * @email 642256541@qq.com
+ * @date 2022/4/20 11:05
+ */
+public class DledgerControllerTest {
+    private List<String> baseDirs;
+    private List<DledgerController> controllers;
+
+    public DledgerController launchController(final String group, final String peers, final String selfId, String storeType, final boolean isEnableElectUncleanMaster) {
+        final String path = "/tmp" + File.separator + group + File.separator + selfId;
+        baseDirs.add(path);
+
+        final ControllerConfig config = new ControllerConfig();
+        config.setControllerDLegerGroup(group);
+        config.setControllerDLegerPeers(peers);
+        config.setControllerDLegerSelfId(selfId);
+        config.setControllerStorePath(path);
+        config.setMappedFileSize(10 * 1024 * 1024);
+        config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
+
+        final DledgerController controller = new DledgerController(config, null);
+
+        controller.startup();
+        return controller;
+    }
+
+    @Before
+    public void startup() {
+        this.baseDirs = new ArrayList<>();
+        this.controllers = new ArrayList<>();
+    }
+
+    @After
+    public void tearDown() {
+        for (Controller controller : this.controllers) {
+            controller.shutdown();
+        }
+        for (String dir : this.baseDirs) {
+            System.out.println("Delete file " + dir);
+            new File(dir).delete();
+        }
+    }
+
+    public boolean registerNewBroker(Controller leader, String clusterName, String brokerName, String brokerAddress,
+        boolean isFirstRegisteredBroker) throws Exception {
+        // Register new broker
+        final RegisterBrokerRequestHeader registerRequest =
+            new RegisterBrokerRequestHeader(clusterName, brokerName, brokerAddress);
+        final RemotingCommand response = leader.registerBroker(registerRequest).get(10, TimeUnit.SECONDS);
+        final RegisterBrokerResponseHeader registerResult = (RegisterBrokerResponseHeader) response.readCustomHeader();
+        System.out.println("------------- Register broker done, the result is :" + registerResult);
+
+        if (!isFirstRegisteredBroker) {
+            assertTrue(registerResult.getBrokerId() > 0);
+        }
+        return true;
+    }
+
+    private boolean alterNewInSyncSet(Controller leader, String brokerName, String masterAddress, int masterEpoch,
+        Set<String> newSyncStateSet, int syncStateSetEpoch) throws Exception {
+        final AlterSyncStateSetRequestHeader alterRequest =
+            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch, newSyncStateSet, syncStateSetEpoch);
+        final RemotingCommand response = leader.alterSyncStateSet(alterRequest).get(10, TimeUnit.SECONDS);
+
+        final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).get(10, TimeUnit.SECONDS);
+        final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
+        assertArrayEquals(replicaInfo.getSyncStateSet().toArray(), newSyncStateSet.toArray());
+        assertEquals(replicaInfo.getSyncStateSetEpoch(), syncStateSetEpoch + 1);
+        return true;
+    }
+
+    public DledgerController waitLeader(final List<DledgerController> controllers) throws Exception {
+        if (controllers.isEmpty()) {
+            return null;
+        }
+        DledgerController c1 = controllers.get(0);
+        while (c1.getMemberState().getLeaderId() == null) {
+            Thread.sleep(1000);
+        }
+        String leaderId = c1.getMemberState().getLeaderId();
+        System.out.println("New leader " + leaderId);
+        for (DledgerController controller : controllers) {
+            if (controller.getMemberState().getSelfId().equals(leaderId)) {
+                return controller;
+            }
+        }
+        return null;
+    }
+
+    public DledgerController mockMetaData(boolean enableElectUncleanMaster) throws Exception {
+        String group = UUID.randomUUID().toString();
+        String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
+        DledgerController c0 = launchController(group, peers, "n0", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+        DledgerController c1 = launchController(group, peers, "n1", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+        DledgerController c2 = launchController(group, peers, "n2", DLedgerConfig.MEMORY, enableElectUncleanMaster);
+        controllers.add(c0);
+        controllers.add(c1);
+        controllers.add(c2);
+
+        DledgerController leader = waitLeader(controllers);
+
+        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9000", true));
+        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9001", true));
+        assertTrue(registerNewBroker(leader, "cluster1", "broker1", "127.0.0.1:9002", true));
+        final RemotingCommand getInfoResponse = leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS);
+        final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) getInfoResponse.readCustomHeader();
+        assertEquals(replicaInfo.getMasterEpoch(), 1);
+        assertEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
+
+        // Try alter sync state set
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add("127.0.0.1:9000");
+        newSyncStateSet.add("127.0.0.1:9001");
+        newSyncStateSet.add("127.0.0.1:9002");
+        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1));
+        return leader;
+    }
+
+    @Test
+    public void testElectMaster() throws Exception {
+        final DledgerController leader = mockMetaData(false);
+        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final RemotingCommand resp = leader.electMaster(request).get(10, TimeUnit.SECONDS);
+        final ElectMasterResponseHeader response = (ElectMasterResponseHeader) resp.readCustomHeader();
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+    }
+
+    @Test
+    public void testAllReplicasShutdownAndRestartWithUnEnableElectUnCleanMaster() throws Exception {
+        final DledgerController leader = mockMetaData(false);
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add("127.0.0.1:9000");
+
+        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+        // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+        // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
+        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final RemotingCommand resp = leader.electMaster(electRequest).get(10, TimeUnit.SECONDS);
+
+        final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).
+            get(10, TimeUnit.SECONDS).readCustomHeader();
+        assertEquals(replicaInfo.getSyncStateSet(), newSyncStateSet);
+        assertEquals(replicaInfo.getMasterAddress(), "");
+        assertEquals(replicaInfo.getMasterEpoch(), 2);
+
+        // Now, we start broker1 - 127.0.0.1:9001, but it was not in syncStateSet, so it will not be elected as master.
+        final RegisterBrokerRequestHeader request1 =
+            new RegisterBrokerRequestHeader("cluster1", "broker1", "127.0.0.1:9001");
+        final RegisterBrokerResponseHeader r1 = (RegisterBrokerResponseHeader) leader.registerBroker(request1).get(10, TimeUnit.SECONDS).readCustomHeader();
+        assertEquals(r1.getBrokerId(), 2);
+        assertEquals(r1.getMasterAddress(), "");
+        assertEquals(r1.getMasterEpoch(), 2);
+
+        // Now, we start broker1 - 127.0.0.1:9000, it will be elected as master
+        final RegisterBrokerRequestHeader request2 =
+            new RegisterBrokerRequestHeader("cluster1", "broker1", "127.0.0.1:9000");
+        final RegisterBrokerResponseHeader r2 = (RegisterBrokerResponseHeader) leader.registerBroker(request2).get(10, TimeUnit.SECONDS).readCustomHeader();
+        assertEquals(r2.getBrokerId(), 0);
+        assertEquals(r2.getMasterAddress(), "127.0.0.1:9000");
+        assertEquals(r2.getMasterEpoch(), 3);
+    }
+
+    @Test
+    public void testEnableElectUnCleanMaster() throws Exception {
+        final DledgerController leader = mockMetaData(true);
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add("127.0.0.1:9000");
+
+        assertTrue(alterNewInSyncSet(leader, "broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+        // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+        // However, event if the syncStateSet in statemachine is {"127.0.0.1:9000"}
+        // the option {enableElectUncleanMaster = true}, so the controller sill can elect a new master
+        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final CompletableFuture<RemotingCommand> future = leader.electMaster(electRequest);
+        future.get(10, TimeUnit.SECONDS);
+
+        final GetReplicaInfoResponseHeader replicaInfo = (GetReplicaInfoResponseHeader) leader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS).readCustomHeader();
+        final HashSet<String> newSyncStateSet2 = new HashSet<>();
+        newSyncStateSet2.add(replicaInfo.getMasterAddress());
+        assertEquals(replicaInfo.getSyncStateSet(), newSyncStateSet2);
+        assertNotEquals(replicaInfo.getMasterAddress(), "");
+        assertNotEquals(replicaInfo.getMasterAddress(), "127.0.0.1:9000");
+        assertEquals(replicaInfo.getMasterEpoch(), 2);
+    }
+
+    @Test
+    public void testChangeControllerLeader() throws Exception {
+        final DledgerController leader = mockMetaData(false);
+        leader.shutdown();
+        Thread.sleep(2000);
+        this.controllers.remove(leader);
+        // Wait leader again
+        final DledgerController newLeader = waitLeader(this.controllers);
+        assertNotNull(newLeader);
+
+        final GetReplicaInfoResponseHeader response = (GetReplicaInfoResponseHeader) newLeader.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).get(10, TimeUnit.SECONDS).readCustomHeader();
+        assertEquals(response.getMasterAddress(), "127.0.0.1:9000");
+        assertEquals(response.getMasterEpoch(), 1);
+
+        final HashSet<String> syncStateSet = new HashSet<>();
+        syncStateSet.add("127.0.0.1:9000");
+        syncStateSet.add("127.0.0.1:9001");
+        syncStateSet.add("127.0.0.1:9002");
+        assertEquals(response.getSyncStateSet(), syncStateSet);
+    }
+}
\ No newline at end of file
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
new file mode 100644
index 000000000..1a2058842
--- /dev/null
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.controller.manager;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
+import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
+import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ReplicasInfoManagerTest {
+    private ReplicasInfoManager replicasInfoManager;
+
+    @Before
+    public void init() {
+        final ControllerConfig config = new ControllerConfig();
+        config.setEnableElectUncleanMaster(false);
+        this.replicasInfoManager = new ReplicasInfoManager(config);
+    }
+
+    public boolean registerNewBroker(String clusterName, String brokerName, String brokerAddress, boolean isFirstRegisteredBroker) {
+        // Register new broker
+        final RegisterBrokerRequestHeader registerRequest =
+            new RegisterBrokerRequestHeader(clusterName, brokerName, brokerAddress);
+        final ControllerResult<RegisterBrokerResponseHeader> registerResult = this.replicasInfoManager.registerBroker(registerRequest);
+        apply(registerResult.getEvents());
+
+        if (isFirstRegisteredBroker) {
+            final ControllerResult<GetReplicaInfoResponseHeader> getInfoResult = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName));
+            final GetReplicaInfoResponseHeader replicaInfo = getInfoResult.getResponse();
+            assertEquals(replicaInfo.getMasterAddress(), brokerAddress);
+            assertEquals(replicaInfo.getMasterEpoch(), 1);
+            assertEquals(replicaInfo.getSyncStateSet().size(), 1);
+        } else {
+            final RegisterBrokerResponseHeader response = registerResult.getResponse();
+            assertTrue(response.getBrokerId() > 0);
+        }
+        return true;
+    }
+
+    private boolean alterNewInSyncSet(String brokerName, String masterAddress, int masterEpoch, Set<String> newSyncStateSet, int syncStateSetEpoch) {
+        final AlterSyncStateSetRequestHeader alterRequest =
+            new AlterSyncStateSetRequestHeader(brokerName, masterAddress, masterEpoch, newSyncStateSet, syncStateSetEpoch);
+        final ControllerResult<AlterSyncStateSetResponseHeader> result = this.replicasInfoManager.alterSyncStateSet(alterRequest, (va1, va2) -> true);
+        apply(result.getEvents());
+
+        final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader(brokerName)).getResponse();
+        assertArrayEquals(replicaInfo.getSyncStateSet().toArray(), newSyncStateSet.toArray());
+        assertEquals(replicaInfo.getSyncStateSetEpoch(), syncStateSetEpoch + 1);
+        return true;
+    }
+
+    private void apply(final List<EventMessage> events) {
+        for (EventMessage event : events) {
+            this.replicasInfoManager.applyEvent(event);
+        }
+    }
+
+
+    public void mockMetaData() {
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9000", true);
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9001", false);
+        registerNewBroker("cluster1", "broker1", "127.0.0.1:9002", false);
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add("127.0.0.1:9000");
+        newSyncStateSet.add("127.0.0.1:9001");
+        newSyncStateSet.add("127.0.0.1:9002");
+        assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 1));
+    }
+
+    @Test
+    public void testElectMaster() {
+        mockMetaData();
+        final ElectMasterRequestHeader request = new ElectMasterRequestHeader("broker1");
+        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(request, (va1, va2) -> true);
+        final ElectMasterResponseHeader response = cResult.getResponse();
+        assertEquals(response.getMasterEpoch(), 2);
+        assertFalse(response.getNewMasterAddress().isEmpty());
+        assertNotEquals(response.getNewMasterAddress(), "127.0.0.1:9000");
+    }
+
+    @Test
+    public void testAllReplicasShutdownAndRestart() {
+        mockMetaData();
+        final HashSet<String> newSyncStateSet = new HashSet<>();
+        newSyncStateSet.add("127.0.0.1:9000");
+        assertTrue(alterNewInSyncSet("broker1", "127.0.0.1:9000", 1, newSyncStateSet, 2));
+
+        // Now we trigger electMaster api, which means the old master is shutdown and want to elect a new master.
+        // However, the syncStateSet in statemachine is {"127.0.0.1:9000"}, not more replicas can be elected as master, it will be failed.
+        final ElectMasterRequestHeader electRequest = new ElectMasterRequestHeader("broker1");
+        final ControllerResult<ElectMasterResponseHeader> cResult = this.replicasInfoManager.electMaster(electRequest, (va1, va2) -> true);
+        final List<EventMessage> events = cResult.getEvents();
+        assertEquals(events.size(), 1);
+        final ElectMasterEvent event = (ElectMasterEvent) events.get(0);
+        assertFalse(event.getNewMasterElected());
+
+        apply(cResult.getEvents());
+
+        final GetReplicaInfoResponseHeader replicaInfo = this.replicasInfoManager.getReplicaInfo(new GetReplicaInfoRequestHeader("broker1")).getResponse();
+        assertEquals(replicaInfo.getMasterAddress(), "");
+        assertEquals(replicaInfo.getMasterEpoch(), 2);
+    }
+
+}
\ No newline at end of file