You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/18 12:17:03 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Stand alone a new controller module (#4333)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 4e9e4e3ef [Summer of code] Stand alone a new controller module (#4333)
4e9e4e3ef is described below

commit 4e9e4e3efb3e75fd82ef37782d3714115533a880
Author: hzh0425 <64...@qq.com>
AuthorDate: Wed May 18 20:16:43 2022 +0800

    [Summer of code] Stand alone a new controller module (#4333)
    
    * feature: add new module controller
    
    * feature: add heartbeat manager
    
    * feature: link requestProcessor and heartbeatmanager
    
    * feature: add controllerManager and startup
    
    * feature: remove namesrv's duplicate controller code
    
    * review code
---
 .../org/apache/rocketmq/common/BrokerAddrInfo.java |  78 +++++++++
 .../rocketmq/common/namesrv/ControllerConfig.java  |  33 ++++
 .../controller/BrokerRegisterRequestHeader.java    |  12 ++
 {namesrv => controller}/pom.xml                    |  21 +--
 .../controller/BrokerHeartbeatManager.java         |  65 +++++++
 .../controller/BrokerHousekeepingService.java      |  51 ++++++
 .../apache/rocketmq}/controller/Controller.java    |   3 +-
 .../rocketmq/controller/ControllerManager.java     | 162 +++++++++++++++++
 .../rocketmq/controller/ControllerStartup.java     | 164 ++++++++++++++++++
 .../impl/DefaultBrokerHeartbeatManager.java        | 174 +++++++++++++++++++
 .../controller/impl/DledgerController.java         |  24 +--
 .../impl/DledgerControllerStateMachine.java        |   8 +-
 .../impl}/event/AlterSyncStateSetEvent.java        |   2 +-
 .../controller/impl}/event/ApplyBrokerIdEvent.java |   2 +-
 .../controller/impl}/event/ControllerResult.java   |   2 +-
 .../controller/impl}/event/ElectMasterEvent.java   |   2 +-
 .../controller/impl}/event/EventMessage.java       |   2 +-
 .../controller/impl}/event/EventSerializer.java    |   2 +-
 .../rocketmq/controller/impl}/event/EventType.java |   2 +-
 .../controller/impl}/manager/BrokerInfo.java       |   2 +-
 .../impl}/manager/InSyncReplicasInfo.java          |   2 +-
 .../impl}/manager/ReplicasInfoManager.java         |  14 +-
 .../processor/ControllerRequestProcessor.java      |  29 +++-
 .../impl/controller/ControllerManagerTest.java     | 191 +++++++++++++++++++++
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |  52 ++++++
 .../controller/impl/DledgerControllerTest.java     |  16 +-
 .../impl}/manager/ReplicasInfoManagerTest.java     |  13 +-
 controller/src/test/resources/logback-test.xml     |  33 ++++
 namesrv/pom.xml                                    |   4 +
 .../apache/rocketmq/namesrv/NamesrvController.java |  17 +-
 .../processor/ControllerRequestProcessor.java      |   4 +-
 .../namesrv/routeinfo/RouteInfoManager.java        |  76 +-------
 pom.xml                                            |   6 +
 33 files changed, 1117 insertions(+), 151 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
new file mode 100644
index 000000000..cd122c83a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerAddrInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public class BrokerAddrInfo {
+    private final String clusterName;
+    private final String brokerAddr;
+
+    private int hash;
+
+    public BrokerAddrInfo(String clusterName, String brokerAddr) {
+        this.clusterName = clusterName;
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public boolean isEmpty() {
+        return clusterName.isEmpty() && brokerAddr.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+
+        if (obj instanceof BrokerAddrInfo) {
+            BrokerAddrInfo addr = (BrokerAddrInfo) obj;
+            return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int h = hash;
+        if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
+            for (int i = 0; i < clusterName.length(); i++) {
+                h = 31 * h + clusterName.charAt(i);
+            }
+            h = 31 * h + '_';
+            for (int i = 0; i < brokerAddr.length(); i++) {
+                h = 31 * h + brokerAddr.charAt(i);
+            }
+            hash = h;
+        }
+        return h;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]";
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
index 4d4525aef..1d73c9d02 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
@@ -17,14 +17,23 @@
 package org.apache.rocketmq.common.namesrv;
 
 import java.io.File;
+import org.apache.rocketmq.common.MixAll;
 
 public class ControllerConfig {
 
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+    private String configStorePath = System.getProperty("user.home") + File.separator + "controller" + File.separator + "controller.properties";
+
     /**
      * Is startup the controller in this name-srv
      */
     private boolean isStartupController = false;
 
+    /**
+     * Interval of periodic scanning for non-active broker;
+     */
+    private long scanNotActiveBrokerInterval = 5 * 1000;
+
     /**
      * Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.
      */
@@ -51,6 +60,22 @@ public class ControllerConfig {
      */
     private boolean isProcessReadEvent = false;
 
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+    public String getConfigStorePath() {
+        return configStorePath;
+    }
+
+    public void setConfigStorePath(String configStorePath) {
+        this.configStorePath = configStorePath;
+    }
+
     public boolean isStartupController() {
         return isStartupController;
     }
@@ -59,6 +84,14 @@ public class ControllerConfig {
         isStartupController = startupController;
     }
 
+    public long getScanNotActiveBrokerInterval() {
+        return scanNotActiveBrokerInterval;
+    }
+
+    public void setScanNotActiveBrokerInterval(long scanNotActiveBrokerInterval) {
+        this.scanNotActiveBrokerInterval = scanNotActiveBrokerInterval;
+    }
+
     public int getControllerThreadPoolNums() {
         return controllerThreadPoolNums;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
index 55f84efc2..a29e01e49 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/controller/BrokerRegisterRequestHeader.java
@@ -17,12 +17,16 @@
 package org.apache.rocketmq.common.protocol.header.namesrv.controller;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
 public class BrokerRegisterRequestHeader implements CommandCustomHeader {
     private String clusterName;
     private String brokerName;
     private String brokerAddress;
+    @CFNullable
+    private Long heartbeatTimeoutMillis;
+
 
     public BrokerRegisterRequestHeader() {
     }
@@ -57,6 +61,14 @@ public class BrokerRegisterRequestHeader implements CommandCustomHeader {
         this.brokerAddress = brokerAddress;
     }
 
+    public Long getHeartbeatTimeoutMillis() {
+        return heartbeatTimeoutMillis;
+    }
+
+    public void setHeartbeatTimeoutMillis(Long heartbeatTimeoutMillis) {
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+    }
+
     @Override public String toString() {
         return "RegisterBrokerRequestHeader{" +
             "clusterName='" + clusterName + '\'' +
diff --git a/namesrv/pom.xml b/controller/pom.xml
similarity index 82%
copy from namesrv/pom.xml
copy to controller/pom.xml
index 83b11d1c5..920ea0de2 100644
--- a/namesrv/pom.xml
+++ b/controller/pom.xml
@@ -17,15 +17,14 @@
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
         <version>5.0.0-BETA-SNAPSHOT</version>
     </parent>
-
     <modelVersion>4.0.0</modelVersion>
     <packaging>jar</packaging>
-    <artifactId>rocketmq-namesrv</artifactId>
-    <name>rocketmq-namesrv ${project.version}</name>
+    <artifactId>rocketmq-controller</artifactId>
+    <name>rocketmq-controller ${project.version}</name>
 
     <dependencies>
         <dependency>
@@ -57,21 +56,9 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.openjdk.jmh</groupId>
-            <artifactId>jmh-core</artifactId>
-            <version>1.19</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.openjdk.jmh</groupId>
-            <artifactId>jmh-generator-annprocess</artifactId>
-            <version>1.19</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.bouncycastle</groupId>
             <artifactId>bcpkix-jdk15on</artifactId>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
new file mode 100644
index 000000000..f86dbf558
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import io.netty.channel.Channel;
+
+public interface BrokerHeartbeatManager {
+
+    /**
+     * Broker new heartbeat.
+     */
+    void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
+
+    /**
+     * Start heartbeat manager.
+     */
+    void start();
+
+    /**
+     * Shutdown heartbeat manager.
+     */
+    void shutdown();
+
+    /**
+     * Add BrokerLifecycleListener.
+     */
+    void addBrokerLifecycleListener(final BrokerLifecycleListener listener);
+
+    /**
+     * Register new broker to heartManager.
+     */
+    void registerBroker(final String clusterName, final String brokerName, final String brokerAddr, final long brokerId,
+        final Long timeoutMillis, final Channel channel);
+
+    /**
+     * Broker channel close
+     */
+    void onBrokerChannelClose(final Channel channel);
+
+    /**
+     * Check whether broker active
+     */
+    boolean isBrokerActive(final String clusterName, final String brokerAddr);
+
+    interface BrokerLifecycleListener {
+        /**
+         * Trigger when broker inactive.
+         */
+        void onBrokerInactive(final String brokerName, final String brokerAddress, final long brokerId);
+    }
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
new file mode 100644
index 000000000..50c96cfd3
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHousekeepingService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class BrokerHousekeepingService implements ChannelEventListener {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+    private final ControllerManager controllerManager;
+
+    public BrokerHousekeepingService(ControllerManager controllerManager) {
+        this.controllerManager = controllerManager;
+    }
+
+    @Override
+    public void onChannelConnect(String remoteAddr, Channel channel) {
+    }
+
+    @Override
+    public void onChannelClose(String remoteAddr, Channel channel) {
+        this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+    }
+
+    @Override
+    public void onChannelException(String remoteAddr, Channel channel) {
+        this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+    }
+
+    @Override
+    public void onChannelIdle(String remoteAddr, Channel channel) {
+        this.controllerManager.getHeartbeatManager().onBrokerChannelClose(channel);
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
similarity index 98%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
rename to controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index 44f2cdf2c..ec45a8009 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -14,7 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller;
+
+package org.apache.rocketmq.controller;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
new file mode 100644
index 000000000..7b835c5c0
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import java.util.Collections;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
+import org.apache.rocketmq.controller.impl.DledgerController;
+import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ControllerManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+    private final ControllerConfig controllerConfig;
+    private final NettyServerConfig nettyServerConfig;
+    private final NettyClientConfig nettyClientConfig;
+    private final BrokerHousekeepingService brokerHousekeepingService;
+    private final Configuration configuration;
+    private Controller controller;
+    private BrokerHeartbeatManager heartbeatManager;
+    private RemotingClient remotingClient;
+    private RemotingServer remotingServer;
+
+    private ExecutorService controllerRequestExecutor;
+    private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
+
+    public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
+        NettyClientConfig nettyClientConfig) {
+        this.controllerConfig = controllerConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.nettyClientConfig = nettyClientConfig;
+        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+        this.configuration = new Configuration(
+            log,
+            this.controllerConfig, this.nettyServerConfig
+        );
+        this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
+    }
+
+    public boolean initialize() {
+        this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
+        this.controllerRequestExecutor = new ThreadPoolExecutor(
+            this.controllerConfig.getControllerThreadPoolNums(),
+            this.controllerConfig.getControllerThreadPoolNums(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            this.controllerRequestThreadPoolQueue,
+            new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
+            @Override
+            protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+                return new FutureTaskExt<T>(runnable, value);
+            }
+        };
+        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+        this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+        this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress() + ":" + this.nettyServerConfig.getListenPort()));
+
+        this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
+        this.controller = new DledgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr));
+
+        // Register broker inactive listener
+        this.heartbeatManager.addBrokerLifecycleListener(new BrokerHeartbeatManager.BrokerLifecycleListener() {
+            @Override
+            public void onBrokerInactive(String brokerName, String brokerAddress, long brokerId) {
+                if (brokerId == MixAll.MASTER_ID) {
+                    final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
+                    try {
+                        final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
+                        final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
+                        if (responseHeader != null) {
+                            log.info("Broker{}'s master shutdown, elect a new master:{}", brokerName, responseHeader);
+                        }
+                    } catch (Exception ignored) {
+                    }
+                }
+            }
+        });
+        this.registerProcessor();
+        return true;
+    }
+
+    public void registerProcessor() {
+        final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
+        this.remotingServer.registerDefaultProcessor(controllerRequestProcessor, this.controllerRequestExecutor);
+    }
+
+    public void start() {
+        this.remotingServer.start();
+        this.remotingClient.start();
+        this.heartbeatManager.start();
+        this.controller.startup();
+    }
+
+    public void shutdown() {
+        this.remotingServer.shutdown();
+        this.heartbeatManager.shutdown();
+        this.controller.shutdown();
+        this.controllerRequestExecutor.shutdown();
+    }
+
+    public BrokerHeartbeatManager getHeartbeatManager() {
+        return heartbeatManager;
+    }
+
+    public ControllerConfig getControllerConfig() {
+        return controllerConfig;
+    }
+
+    public Controller getController() {
+        return controller;
+    }
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
new file mode 100644
index 000000000..3820e64e6
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.srvutil.ShutdownHookThread;
+import org.slf4j.LoggerFactory;
+
+public class ControllerStartup {
+
+    private static InternalLogger log;
+    private static Properties properties = null;
+    private static CommandLine commandLine = null;
+
+    public static void main(String[] args) {
+        main0(args);
+    }
+
+    public static ControllerManager main0(String[] args) {
+
+        try {
+            ControllerManager controller = createControllerManager(args);
+            start(controller);
+            String tip = "The Controller Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+            log.info(tip);
+            System.out.printf("%s%n", tip);
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    public static ControllerManager createControllerManager(String[] args) throws IOException, JoranException {
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        commandLine = ServerUtil.parseCmdLine("mqcontroller", args, buildCommandlineOptions(options), new PosixParser());
+        if (null == commandLine) {
+            System.exit(-1);
+            return null;
+        }
+
+        final ControllerConfig controllerConfig = new ControllerConfig();
+        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        nettyServerConfig.setListenPort(19876);
+
+        if (commandLine.hasOption('c')) {
+            String file = commandLine.getOptionValue('c');
+            if (file != null) {
+                InputStream in = new BufferedInputStream(new FileInputStream(file));
+                properties = new Properties();
+                properties.load(in);
+                MixAll.properties2Object(properties, controllerConfig);
+                MixAll.properties2Object(properties, nettyServerConfig);
+                MixAll.properties2Object(properties, nettyClientConfig);
+
+                System.out.printf("load config properties file OK, %s%n", file);
+                in.close();
+            }
+        }
+
+        if (commandLine.hasOption('p')) {
+            MixAll.printObjectProperties(null, controllerConfig);
+            MixAll.printObjectProperties(null, nettyServerConfig);
+            MixAll.printObjectProperties(null, nettyClientConfig);
+            System.exit(0);
+        }
+
+        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), controllerConfig);
+
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        JoranConfigurator configurator = new JoranConfigurator();
+        configurator.setContext(lc);
+        lc.reset();
+        configurator.doConfigure(controllerConfig.getRocketmqHome() + "/conf/logback_controller.xml");
+
+        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+        MixAll.printObjectProperties(log, controllerConfig);
+        MixAll.printObjectProperties(log, nettyServerConfig);
+
+        final ControllerManager controllerManager = new ControllerManager(controllerConfig, nettyServerConfig, nettyClientConfig);
+        // remember all configs to prevent discard
+        controllerManager.getConfiguration().registerConfig(properties);
+
+        return controllerManager;
+    }
+
+    public static ControllerManager start(final ControllerManager controller) throws Exception {
+
+        if (null == controller) {
+            throw new IllegalArgumentException("ControllerManager is null");
+        }
+
+        boolean initResult = controller.initialize();
+        if (!initResult) {
+            controller.shutdown();
+            System.exit(-3);
+        }
+
+        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
+            controller.shutdown();
+            return null;
+        }));
+
+        controller.start();
+
+        return controller;
+    }
+
+    public static void shutdown(final ControllerManager controller) {
+        controller.shutdown();
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Controller config properties file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "printConfigItem", false, "Print all config items");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
new file mode 100644
index 000000000..7c4b08149
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.BrokerAddrInfo;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+    private static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;
+    private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
+    private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
+
+    private final ControllerConfig controllerConfig;
+    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
+    private final List<BrokerLifecycleListener> brokerLifecycleListeners;
+
+    public DefaultBrokerHeartbeatManager(final ControllerConfig controllerConfig) {
+        this.controllerConfig = controllerConfig;
+        this.brokerLiveTable = new ConcurrentHashMap<>(256);
+        this.brokerLifecycleListeners = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+        this.scheduledService.scheduleAtFixedRate(this::scanNotActiveBroker, 2000, this.controllerConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void shutdown() {
+        this.scheduledService.shutdown();
+        this.executor.shutdown();
+    }
+
+    public void scanNotActiveBroker() {
+        try {
+            log.info("start scanNotActiveBroker");
+            final Iterator<Map.Entry<BrokerAddrInfo, BrokerLiveInfo>> iterator = this.brokerLiveTable.entrySet().iterator();
+            while (iterator.hasNext()) {
+                final Map.Entry<BrokerAddrInfo, BrokerLiveInfo> next = iterator.next();
+                long last = next.getValue().lastUpdateTimestamp;
+                long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
+                if ((last + timeoutMillis) < System.currentTimeMillis()) {
+                    final Channel channel = next.getValue().channel;
+                    if (channel != null) {
+                        RemotingUtil.closeChannel(channel);
+                    }
+                    iterator.remove();
+                    this.executor.submit(() ->
+                        notifyBrokerInActive(next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
+                    log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
+                }
+            }
+        } catch (Exception e) {
+            log.error("scanNotActiveBroker exception", e);
+        }
+    }
+
+    private void notifyBrokerInActive(String brokerName, String brokerAddr, Long brokerId) {
+        for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) {
+            listener.onBrokerInactive(brokerName, brokerAddr, brokerId);
+        }
+    }
+
+    @Override
+    public void addBrokerLifecycleListener(BrokerLifecycleListener listener) {
+        this.brokerLifecycleListeners.add(listener);
+    }
+
+    @Override
+    public void registerBroker(String clusterName, String brokerName, String brokerAddr,
+        long brokerId, Long timeoutMillis, Channel channel) {
+        final BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+        final BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(addrInfo,
+            new BrokerLiveInfo(brokerName,
+                brokerId,
+                System.currentTimeMillis(),
+                timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+                channel));
+        if (prevBrokerLiveInfo == null) {
+            log.info("new broker registered, {}, brokerId:{}", addrInfo, brokerId);
+        }
+    }
+
+    @Override
+    public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
+        BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+        BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+        if (prev != null) {
+            prev.lastUpdateTimestamp = System.currentTimeMillis();
+        }
+    }
+
+    @Override
+    public void onBrokerChannelClose(Channel channel) {
+        synchronized (this) {
+            for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
+                if (entry.getValue().channel == channel) {
+                    this.executor.submit(() ->
+                        notifyBrokerInActive(entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean isBrokerActive(String clusterName, String brokerAddr) {
+        final BrokerLiveInfo info = this.brokerLiveTable.get(new BrokerAddrInfo(clusterName, brokerAddr));
+        if (info != null) {
+            long last = info.lastUpdateTimestamp;
+            long timeoutMillis = info.heartbeatTimeoutMillis;
+            return (last + timeoutMillis) >= System.currentTimeMillis();
+        }
+        return false;
+    }
+
+    static class BrokerLiveInfo {
+        private final String brokerName;
+        private final long brokerId;
+        private final long heartbeatTimeoutMillis;
+        private final Channel channel;
+        private long lastUpdateTimestamp;
+
+        public BrokerLiveInfo(String brokerName, long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
+            Channel channel) {
+            this.brokerName = brokerName;
+            this.brokerId = brokerId;
+            this.lastUpdateTimestamp = lastUpdateTimestamp;
+            this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+            this.channel = channel;
+        }
+
+        @Override public String toString() {
+            return "BrokerLiveInfo{" +
+                "brokerName='" + brokerName + '\'' +
+                ", brokerId=" + brokerId +
+                ", lastUpdateTimestamp=" + lastUpdateTimestamp +
+                ", heartbeatTimeoutMillis=" + heartbeatTimeoutMillis +
+                ", channel=" + channel +
+                '}';
+        }
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
similarity index 95%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
index a7ac9d592..d057e705d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl;
 
 import io.openmessaging.storage.dledger.AppendFuture;
 import io.openmessaging.storage.dledger.DLedgerConfig;
@@ -45,14 +45,13 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegis
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventSerializer;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.namesrv.controller.Controller;
-import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -65,10 +64,8 @@ public class DledgerController implements Controller {
     private final DLedgerServer dLedgerServer;
     private final ControllerConfig controllerConfig;
     private final DLedgerConfig dLedgerConfig;
-    private final NamesrvController namesrvController;
     // Usr for checking whether the broker is alive
     private final BiPredicate<String, String> brokerAlivePredicate;
-
     private final ReplicasInfoManager replicasInfoManager;
     private final EventScheduler scheduler;
     private final EventSerializer eventSerializer;
@@ -76,16 +73,11 @@ public class DledgerController implements Controller {
     private final DledgerControllerStateMachine statemachine;
     private volatile boolean isScheduling = false;
 
-    public DledgerController(final ControllerConfig config, final NamesrvController namesrvController) {
+    public DledgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
         this.controllerConfig = config;
         this.eventSerializer = new EventSerializer();
         this.scheduler = new EventScheduler();
-        this.namesrvController = namesrvController;
-        if (namesrvController == null) {
-            this.brokerAlivePredicate = (cluster, address) -> true;
-        } else {
-            this.brokerAlivePredicate = (cluster, address) -> namesrvController.getRouteInfoManager().isBrokerAlive(cluster, address);
-        }
+        this.brokerAlivePredicate = brokerAlivePredicate;
 
         this.dLedgerConfig = new DLedgerConfig();
         this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
similarity index 91%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
index 3e21d29dd..54e93c5f2 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerStateMachine.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerControllerStateMachine.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl;
 
 import io.openmessaging.storage.dledger.entry.DLedgerEntry;
 import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
@@ -23,11 +23,11 @@ import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
 import io.openmessaging.storage.dledger.statemachine.StateMachine;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventSerializer;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.manager.ReplicasInfoManager;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventSerializer;
 
 /**
  * The state machine implementation of the dledger controller
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
index 4da71e32b..5156b5b52 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/AlterSyncStateSetEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/AlterSyncStateSetEvent.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
index fc9860dbe..06108b57c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ApplyBrokerIdEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ApplyBrokerIdEvent.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 /**
  * The event trys to apply a new id for a new broker.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
index 5d61f92af..41af2af04 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ControllerResult.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
index 34aaeca86..f91c869e8 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/ElectMasterEvent.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ElectMasterEvent.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 /**
  * The event trys to elect a new master for target broker.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
similarity index 94%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
index 768fc0dd6..8a31393e1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventMessage.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventMessage.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 /**
  * The parent class of Event, the subclass needs to indicate eventType.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
index 38d12a93a..f78f399a5 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventSerializer.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventSerializer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.rocketmq.common.utils.FastJsonSerializer;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
similarity index 96%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
index 9496d0a3e..f6971cbdf 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/event/EventType.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/EventType.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager.event;
+package org.apache.rocketmq.controller.impl.event;
 
 /**
  * Event type (name, id);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
index ef2efeaba..1b0242e0d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/BrokerInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/BrokerInfo.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.HashMap;
 import java.util.HashSet;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
index 8b8c5afd4..b0b7b0c3c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/InSyncReplicasInfo.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/InSyncReplicasInfo.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
similarity index 97%
rename from namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
rename to controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index c81b0a122..85ac2734f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,14 +35,14 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMaster
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.impl.event.AlterSyncStateSetEvent;
+import org.apache.rocketmq.controller.impl.event.ApplyBrokerIdEvent;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
+import org.apache.rocketmq.controller.impl.event.EventType;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.manager.event.AlterSyncStateSetEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.ApplyBrokerIdEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventType;
 
 /**
  * The manager that manages the replicas info for all brokers.
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
similarity index 75%
copy from namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
copy to controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index aed048047..bf185c826 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.processor;
+package org.apache.rocketmq.controller.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
@@ -22,18 +22,23 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
+import static org.apache.rocketmq.common.protocol.RequestCode.BROKER_HEARTBEAT;
 import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET;
 import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER;
 import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO;
@@ -47,9 +52,11 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
     private static final int WAIT_TIMEOUT_OUT = 5;
     private final Controller controller;
+    private final BrokerHeartbeatManager heartbeatManager;
 
-    public ControllerRequestProcessor(final Controller controller) {
-        this.controller = controller;
+    public ControllerRequestProcessor(final ControllerManager controllerManager) {
+        this.controller = controllerManager.getController();
+        this.heartbeatManager = controllerManager.getHeartbeatManager();
     }
 
     @Override
@@ -82,7 +89,13 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                 final BrokerRegisterRequestHeader controllerRequest = request.decodeCommandCustomHeader(BrokerRegisterRequestHeader.class);
                 final CompletableFuture<RemotingCommand> future = this.controller.registerBroker(controllerRequest);
                 if (future != null) {
-                    return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    final RemotingCommand response = future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
+                    final BrokerRegisterResponseHeader responseHeader = (BrokerRegisterResponseHeader) response.readCustomHeader();
+                    if (responseHeader != null && responseHeader.getBrokerId() >= 0) {
+                        this.heartbeatManager.registerBroker(controllerRequest.getClusterName(), controllerRequest.getBrokerName(), controllerRequest.getBrokerAddress(),
+                            responseHeader.getBrokerId(), controllerRequest.getHeartbeatTimeoutMillis(), ctx.channel());
+                    }
+                    return response;
                 }
                 break;
             }
@@ -97,6 +110,10 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
             case CONTROLLER_GET_METADATA_INFO: {
                 return this.controller.getControllerMetadata();
             }
+            case BROKER_HEARTBEAT: {
+                final BrokerHeartbeatRequestHeader requestHeader = request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
+                this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
+            }
             default: {
                 final String error = " request type " + request.getCode() + " not supported";
                 return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
new file mode 100644
index 000000000..7df1633a9
--- /dev/null
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.controller.impl.DledgerController;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.common.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
+import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
+import static org.junit.Assert.assertEquals;
+
+public class ControllerManagerTest {
+    private List<String> baseDirs;
+    private List<ControllerManager> controllers;
+    private NettyRemotingClient remotingClient;
+
+    public ControllerManager launchManager(final String group, final String peers, final String selfId, final int listenPort) {
+        final String path = "/tmp" + File.separator + group + File.separator + selfId;
+        baseDirs.add(path);
+
+        final ControllerConfig config = new ControllerConfig();
+        config.setControllerDLegerGroup(group);
+        config.setControllerDLegerPeers(peers);
+        config.setControllerDLegerSelfId(selfId);
+        config.setControllerStorePath(path);
+        config.setMappedFileSize(10 * 1024 * 1024);
+        config.setEnableElectUncleanMaster(true);
+        config.setScanNotActiveBrokerInterval(2000L);
+
+        final NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(listenPort);
+
+        final ControllerManager manager = new ControllerManager(config, serverConfig, new NettyClientConfig());
+        manager.initialize();
+        manager.start();
+        this.controllers.add(manager);
+        return manager;
+    }
+
+    @Before
+    public void startup() {
+        this.baseDirs = new ArrayList<>();
+        this.controllers = new ArrayList<>();
+        this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
+        this.remotingClient.start();
+    }
+
+    public ControllerManager waitLeader(final List<ControllerManager> controllers) throws Exception {
+        if (controllers.isEmpty()) {
+            return null;
+        }
+        DledgerController c1 = (DledgerController) controllers.get(0).getController();
+        while (c1.getMemberState().getLeaderId() == null) {
+            Thread.sleep(1000);
+        }
+        String leaderId = c1.getMemberState().getLeaderId();
+        System.out.println("New leader " + leaderId);
+        for (ControllerManager controllerManager : controllers) {
+            final DledgerController controller = (DledgerController) controllerManager.getController();
+            if (controller.getMemberState().getSelfId().equals(leaderId)) {
+                return controllerManager;
+            }
+        }
+        return null;
+    }
+
+    public void mockData() {
+        String group = UUID.randomUUID().toString();
+        String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
+        launchManager(group, peers, "n0", 31000);
+        launchManager(group, peers, "n1", 31001);
+        launchManager(group, peers, "n2", 31002);
+    }
+
+    /**
+     * Register broker to controller
+     */
+    public BrokerRegisterResponseHeader registerBroker(
+        final String controllerAddress, final String clusterName,
+        final String brokerName, final String address) throws Exception {
+
+        final BrokerRegisterRequestHeader requestHeader = new BrokerRegisterRequestHeader(clusterName, brokerName, address);
+        // Timeout = 3000
+        requestHeader.setHeartbeatTimeoutMillis(4000L);
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
+        final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case SUCCESS: {
+                return response.decodeCommandCustomHeader(BrokerRegisterResponseHeader.class);
+            }
+            case CONTROLLER_NOT_LEADER: {
+                throw new MQBrokerException(response.getCode(), "Controller leader was changed");
+            }
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    @Test
+    public void testSomeApi() throws Exception {
+        mockData();
+        final ControllerManager leader = waitLeader(this.controllers);
+        String leaderAddr = RemotingUtil.getLocalAddress() + ":" + leader.getNettyServerConfig().getListenPort();
+
+        // Register two broker, the first one is master.
+        final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000");
+        assert responseHeader1 != null;
+        assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
+
+        final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001");
+        assert responseHeader2 != null;
+        assertEquals(responseHeader2.getBrokerId(), 2);
+
+        // Send heartbeat for broker2
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(()-> {
+            final BrokerHeartbeatRequestHeader heartbeatRequestHeader = new BrokerHeartbeatRequestHeader();
+            heartbeatRequestHeader.setClusterName("cluster1");
+            heartbeatRequestHeader.setBrokerName("broker1");
+            heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001");
+            final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader);
+            try {
+                this.remotingClient.invokeOneway(leaderAddr, request, 3000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }, 0, 2000L, TimeUnit.MILLISECONDS);
+
+
+        // Wait until the master is expired.
+        Thread.sleep(6000);
+
+        // The new master should be broker2.
+        final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
+        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
+        final RemotingCommand response = this.remotingClient.invokeSync(leaderAddr, request, 3000);
+        final GetReplicaInfoResponseHeader responseHeader = response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
+        assertEquals(responseHeader.getMasterAddress(), "127.0.0.1:8001");
+
+        executor.shutdown();
+    }
+
+    @After
+    public void tearDown() {
+        for (ControllerManager controller : this.controllers) {
+            controller.shutdown();
+        }
+        for (String dir : this.baseDirs) {
+            System.out.println("Delete file " + dir);
+            new File(dir).delete();
+        }
+    }
+}
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
new file mode 100644
index 000000000..d4de7abe4
--- /dev/null
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl.controller.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.controller.BrokerHeartbeatManager;
+import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class DefaultBrokerHeartbeatManagerTest {
+    private BrokerHeartbeatManager heartbeatManager;
+
+    @Before
+    public void init() {
+        final ControllerConfig config = new ControllerConfig();
+        config.setScanNotActiveBrokerInterval(2000);
+        this.heartbeatManager = new DefaultBrokerHeartbeatManager(config);
+        this.heartbeatManager.start();
+    }
+
+    @Test
+    public void testDetectBrokerAlive() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        this.heartbeatManager.addBrokerLifecycleListener((brokerName, brokerAddress, brokerId) -> {
+            System.out.println("Broker shutdown:" + brokerAddress);
+            latch.countDown();
+        });
+        this.heartbeatManager.registerBroker("cluster1", "broker1", "127.0.0.1:7000", 1L, 3000L, null);
+        assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+        this.heartbeatManager.shutdown();
+    }
+
+}
\ No newline at end of file
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
similarity index 98%
rename from namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
index 77384aa3e..3bc9e360f 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/impl/DledgerControllerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DledgerControllerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.impl;
+package org.apache.rocketmq.controller.impl.controller.impl;
 
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import java.io.File;
@@ -28,13 +28,14 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
-import org.apache.rocketmq.namesrv.controller.Controller;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.DledgerController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.junit.After;
@@ -48,11 +49,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-/**
- * @author hzh
- * @email 642256541@qq.com
- * @date 2022/4/20 11:05
- */
 public class DledgerControllerTest {
     private List<String> baseDirs;
     private List<DledgerController> controllers;
@@ -69,7 +65,7 @@ public class DledgerControllerTest {
         config.setMappedFileSize(10 * 1024 * 1024);
         config.setEnableElectUncleanMaster(isEnableElectUncleanMaster);
 
-        final DledgerController controller = new DledgerController(config, null);
+        final DledgerController controller = new DledgerController(config, (str1, str2) -> true);
 
         controller.startup();
         return controller;
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
similarity index 95%
rename from namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
rename to controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index 656ffe471..fc7d420f5 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/controller/manager/ReplicasInfoManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.namesrv.controller.manager;
+package org.apache.rocketmq.controller.impl.controller.impl.manager;
 
 import java.util.HashSet;
 import java.util.List;
@@ -23,15 +23,16 @@ import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterResponseHeader;
-import org.apache.rocketmq.namesrv.controller.manager.event.ControllerResult;
-import org.apache.rocketmq.namesrv.controller.manager.event.ElectMasterEvent;
-import org.apache.rocketmq.namesrv.controller.manager.event.EventMessage;
+import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.impl.event.ControllerResult;
+import org.apache.rocketmq.controller.impl.event.ElectMasterEvent;
+import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/controller/src/test/resources/logback-test.xml b/controller/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..eb12a9a05
--- /dev/null
+++ b/controller/src/test/resources/logback-test.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+    <appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <root>
+        <level value="OFF"/>
+        <appender-ref ref="DefaultAppender"/>
+    </root>
+</configuration>
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 83b11d1c5..1ce091429 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -33,6 +33,10 @@
             <artifactId>dledger</artifactId>
             <version>0.2.5</version>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-controller</artifactId>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-client</artifactId>
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index e10878e6b..0b5e39d64 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -33,10 +33,10 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.DledgerController;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
-import org.apache.rocketmq.namesrv.controller.impl.DledgerController;
 import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
 import org.apache.rocketmq.namesrv.processor.ClientRequestProcessor;
 import org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
@@ -109,10 +109,11 @@ public class NamesrvController {
         this.controllerConfig = controllerConfig;
         this.kvConfigManager = new KVConfigManager(this);
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+        this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
         if (controllerConfig.isStartupController()) {
-            this.controller = new DledgerController(controllerConfig, this);
+            this.controller = new DledgerController(controllerConfig, this.routeInfoManager::isBrokerAlive);
+            this.routeInfoManager.setController(this.controller);
         }
-        this.routeInfoManager = new RouteInfoManager(namesrvConfig, this, this.controller);
         this.configuration = new Configuration(
             LOGGER,
             this.namesrvConfig, this.nettyServerConfig
@@ -168,7 +169,6 @@ public class NamesrvController {
                     return new FutureTaskExt<T>(runnable, value);
                 }
             };
-            this.controller.startup();
         }
 
         this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
@@ -289,6 +289,10 @@ public class NamesrvController {
         }
 
         this.routeInfoManager.start();
+
+        if (this.controllerConfig.isStartupController()) {
+            this.controller.startup();
+        }
     }
 
     public void shutdown() {
@@ -296,6 +300,9 @@ public class NamesrvController {
         this.remotingServer.shutdown();
         this.defaultExecutor.shutdown();
         this.clientRequestExecutor.shutdown();
+        if (this.controllerRequestExecutor != null) {
+            this.controllerRequestExecutor.shutdown();
+        }
         this.scheduledExecutorService.shutdown();
         this.scanExecutorService.shutdown();
         this.routeInfoManager.shutdown();
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
index aed048047..01e75803d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -23,12 +23,12 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
+import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.namesrv.controller.Controller;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 516a94564..cb9c113ed 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.rocketmq.common.BrokerAddrInfo;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -54,10 +55,10 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.namesrv.controller.Controller;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -82,11 +83,6 @@ public class RouteInfoManager {
     private final NamesrvConfig namesrvConfig;
     private Controller controller;
 
-    public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController, final Controller controller) {
-        this(namesrvConfig, namesrvController);
-        this.controller = controller;
-    }
-
     public RouteInfoManager(final NamesrvConfig namesrvConfig, NamesrvController namesrvController) {
         this.topicQueueTable = new ConcurrentHashMap<String, Map<String, QueueData>>(1024);
         this.brokerAddrTable = new ConcurrentHashMap<String, BrokerData>(128);
@@ -607,8 +603,6 @@ public class RouteInfoManager {
                     if (this.namesrvController != null && this.namesrvController.getControllerConfig().isStartupController() && this.controller != null) {
                         if (unRegisterRequest.getBrokerId() == 0) {
                             this.controller.electMaster(new ElectMasterRequestHeader(unRegisterRequest.getBrokerName()));
-                            // Todo: Inform the master
-                            // However, because now the broker does not have the related api, so I will complete the process in the future.
                         }
                     }
                 }
@@ -1134,69 +1128,13 @@ public class RouteInfoManager {
         }
         return false;
     }
-}
-
-/**
- * broker address information
- */
-class BrokerAddrInfo {
-    private String clusterName;
-    private String brokerAddr;
-
-    private int hash;
-
-    public BrokerAddrInfo(String clusterName, String brokerAddr) {
-        this.clusterName = clusterName;
-        this.brokerAddr = brokerAddr;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public String getBrokerAddr() {
-        return brokerAddr;
-    }
-
-    public boolean isEmpty() {
-        return clusterName.isEmpty() && brokerAddr.isEmpty();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-
-        if (obj instanceof BrokerAddrInfo) {
-            BrokerAddrInfo addr = (BrokerAddrInfo) obj;
-            return clusterName.equals(addr.clusterName) && brokerAddr.equals(addr.brokerAddr);
-        }
-        return false;
-    }
 
-    @Override
-    public int hashCode() {
-        int h = hash;
-        if (h == 0 && clusterName.length() + brokerAddr.length() > 0) {
-            for (int i = 0; i < clusterName.length(); i++) {
-                h = 31 * h + clusterName.charAt(i);
-            }
-            h = 31 * h + '_';
-            for (int i = 0; i < brokerAddr.length(); i++) {
-                h = 31 * h + brokerAddr.charAt(i);
-            }
-            hash = h;
-        }
-        return h;
+    public void setController(Controller controller) {
+        this.controller = controller;
     }
 
-    @Override
-    public String toString() {
-        return "BrokerAddrInfo [clusterName=" + clusterName + ", brokerAddr=" + brokerAddr + "]";
+    public Controller getController() {
+        return controller;
     }
 }
 
@@ -1257,6 +1195,8 @@ class BrokerLiveInfo {
         this.haServerAddr = haServerAddr;
     }
 
+
+
     @Override
     public String toString() {
         return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion
diff --git a/pom.xml b/pom.xml
index 08e97dd68..8bfef0986 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
         <module>acl</module>
         <module>example</module>
         <module>container</module>
+        <module>controller</module>
     </modules>
 
     <build>
@@ -453,6 +454,11 @@
 
     <dependencyManagement>
         <dependencies>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-controller</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>rocketmq-client</artifactId>