You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/05 07:23:46 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of code] Reuse dledger remotingServer in controller mode. (#4409)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 8515a8d09 [Summer of code] Reuse dledger remotingServer in controller mode. (#4409)
8515a8d09 is described below

commit 8515a8d0915d1d2e2e41a0fa3ea32ba6efc1595f
Author: hzh0425 <64...@qq.com>
AuthorDate: Sun Jun 5 15:23:25 2022 +0800

    [Summer of code] Reuse dledger remotingServer in controller mode. (#4409)
    
    * reuse dledger remotingServer in Controller
    
    * reuse dledger remotingServer in Namesrv when startup controller
    
    * fix some bugs in controller
    
    * trigger ci
    
    * code review
    
    * fix controllerManagerTest bug
---
 .../controller/BrokerHeartbeatManager.java         |  7 +++-
 .../org/apache/rocketmq/controller/Controller.java |  6 +++
 .../rocketmq/controller/ControllerManager.java     | 43 ++++++++++++++--------
 .../controller/impl/DLedgerController.java         | 31 ++++++++++++----
 .../impl/DefaultBrokerHeartbeatManager.java        | 42 ++++++++++++++-------
 .../impl/manager/ReplicasInfoManager.java          |  9 +++++
 .../processor/ControllerRequestProcessor.java      |  1 +
 .../impl/controller/ControllerManagerTest.java     | 31 +++++++++-------
 .../impl/DefaultBrokerHeartbeatManagerTest.java    |  2 +-
 .../apache/rocketmq/namesrv/NamesrvController.java | 23 ++++++++----
 .../processor/ControllerRequestProcessor.java      | 13 +------
 pom.xml                                            |  2 +-
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 10 -----
 .../test/autoswitchrole/AutoSwitchRoleBase.java    |  4 +-
 .../AutoSwitchRoleIntegrationTest.java             | 22 ++++++-----
 15 files changed, 150 insertions(+), 96 deletions(-)

diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index f86dbf558..e2ff58301 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -25,6 +25,11 @@ public interface BrokerHeartbeatManager {
      */
     void onBrokerHeartbeat(final String clusterName, final String brokerAddr);
 
+    /**
+     * Change the metadata(brokerId ..) for a broker.
+     */
+    void changeBrokerMetadata(final String clusterName, final String brokerAddr, final Long brokerId);
+
     /**
      * Start heartbeat manager.
      */
@@ -60,6 +65,6 @@ public interface BrokerHeartbeatManager {
         /**
          * Trigger when broker inactive.
          */
-        void onBrokerInactive(final String brokerName, final String brokerAddress, final long brokerId);
+        void onBrokerInactive(final String clusterName, final String brokerName, final String brokerAddress, final long brokerId);
     }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
index b4ed69a40..cbd11f591 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncSt
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 /**
@@ -100,4 +101,9 @@ public interface Controller {
      * @return RemotingCommand(GetControllerMetadataResponseHeader)
      */
     RemotingCommand getControllerMetadata();
+
+    /**
+     * Get the remotingServer used by the controller, the upper layer will reuse this remotingServer.
+     */
+    RemotingServer getRemotingServer();
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 2a3a6b578..51e3ccb87 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -23,12 +23,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.controller.impl.DLedgerController;
@@ -38,7 +40,6 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -52,8 +53,6 @@ public class ControllerManager {
     private final Configuration configuration;
     private Controller controller;
     private BrokerHeartbeatManager heartbeatManager;
-    private RemotingServer remotingServer;
-
     private ExecutorService controllerRequestExecutor;
     private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
 
@@ -84,15 +83,14 @@ public class ControllerManager {
                 return new FutureTaskExt<T>(runnable, value);
             }
         };
-        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
-
         this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
-        this.controller = new DLedgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr));
+        this.controller = new DLedgerController(this.controllerConfig, (cluster, brokerAddr) -> this.heartbeatManager.isBrokerActive(cluster, brokerAddr),
+            this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService);
 
         // Register broker inactive listener
         this.heartbeatManager.addBrokerLifecycleListener(new BrokerHeartbeatManager.BrokerLifecycleListener() {
             @Override
-            public void onBrokerInactive(String brokerName, String brokerAddress, long brokerId) {
+            public void onBrokerInactive(String clusterName, String brokerName, String brokerAddress, long brokerId) {
                 if (brokerId == MixAll.MASTER_ID) {
                     if (controller.isLeaderState()) {
                         final CompletableFuture<RemotingCommand> future = controller.electMaster(new ElectMasterRequestHeader(brokerName));
@@ -100,7 +98,10 @@ public class ControllerManager {
                             final RemotingCommand response = future.get(5, TimeUnit.SECONDS);
                             final ElectMasterResponseHeader responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
                             if (responseHeader != null) {
-                                log.info("Broker{}'s master shutdown, elect a new master:{}", brokerName, responseHeader);
+                                log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", brokerName, brokerAddress, responseHeader);
+                                if (StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
+                                    heartbeatManager.changeBrokerMetadata(clusterName, responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
+                                }
                             }
                         } catch (Exception ignored) {
                         }
@@ -110,23 +111,29 @@ public class ControllerManager {
                 }
             }
         });
-        this.registerProcessor();
+        registerProcessor();
         return true;
     }
 
     public void registerProcessor() {
         final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
-        this.remotingServer.registerDefaultProcessor(controllerRequestProcessor, this.controllerRequestExecutor);
+        final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
+        assert controllerRemotingServer != null;
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+        controllerRemotingServer.registerProcessor(RequestCode.BROKER_HEARTBEAT, controllerRequestProcessor, this.controllerRequestExecutor);
     }
 
     public void start() {
-        this.remotingServer.start();
         this.heartbeatManager.start();
         this.controller.startup();
     }
 
     public void shutdown() {
-        this.remotingServer.shutdown();
         this.heartbeatManager.shutdown();
         this.controller.shutdown();
         this.controllerRequestExecutor.shutdown();
@@ -144,14 +151,18 @@ public class ControllerManager {
         return controller;
     }
 
-    public RemotingServer getRemotingServer() {
-        return remotingServer;
-    }
-
     public NettyServerConfig getNettyServerConfig() {
         return nettyServerConfig;
     }
 
+    public NettyClientConfig getNettyClientConfig() {
+        return nettyClientConfig;
+    }
+
+    public BrokerHousekeepingService getBrokerHousekeepingService() {
+        return brokerHousekeepingService;
+    }
+
     public Configuration getConfiguration() {
         return configuration;
     }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 42c3d34f6..3823b9425 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -53,7 +53,11 @@ import org.apache.rocketmq.controller.impl.event.EventSerializer;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 /**
@@ -75,24 +79,30 @@ public class DLedgerController implements Controller {
     private volatile boolean isScheduling = false;
 
     public DLedgerController(final ControllerConfig config, final BiPredicate<String, String> brokerAlivePredicate) {
-        this.controllerConfig = config;
+        this(config, brokerAlivePredicate, null, null, null);
+    }
+
+    public DLedgerController(final ControllerConfig controllerConfig,
+        final BiPredicate<String, String> brokerAlivePredicate, final NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) {
+        this.controllerConfig = controllerConfig;
         this.eventSerializer = new EventSerializer();
         this.scheduler = new EventScheduler();
         this.brokerAlivePredicate = brokerAlivePredicate;
 
         this.dLedgerConfig = new DLedgerConfig();
-        this.dLedgerConfig.setGroup(config.getControllerDLegerGroup());
-        this.dLedgerConfig.setPeers(config.getControllerDLegerPeers());
-        this.dLedgerConfig.setSelfId(config.getControllerDLegerSelfId());
-        this.dLedgerConfig.setStoreBaseDir(config.getControllerStorePath());
-        this.dLedgerConfig.setMappedFileSizeForEntryData(config.getMappedFileSize());
+        this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
+        this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
+        this.dLedgerConfig.setSelfId(controllerConfig.getControllerDLegerSelfId());
+        this.dLedgerConfig.setStoreBaseDir(controllerConfig.getControllerStorePath());
+        this.dLedgerConfig.setMappedFileSizeForEntryData(controllerConfig.getMappedFileSize());
 
         this.roleHandler = new RoleChangeHandler(dLedgerConfig.getSelfId());
-        this.replicasInfoManager = new ReplicasInfoManager(config);
+        this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
         this.statemachine = new DLedgerControllerStateMachine(replicasInfoManager, this.eventSerializer, dLedgerConfig.getSelfId());
 
         // Register statemachine and role handler.
-        this.dLedgerServer = new DLedgerServer(dLedgerConfig);
+        this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
         this.dLedgerServer.registerStateMachine(this.statemachine);
         this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
     }
@@ -175,6 +185,11 @@ public class DLedgerController implements Controller {
             state.getGroup(), state.getLeaderId(), state.getLeaderAddr(), state.isLeader(), sb.toString()));
     }
 
+    @Override
+    public RemotingServer getRemotingServer() {
+        return this.dLedgerServer.getRemotingServer();
+    }
+
     /**
      * Append the request to dledger, wait the dledger to commit the request.
      */
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 7c4b08149..050f0aa6f 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -39,7 +39,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
     private static final long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 10;
     private final ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
-    private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
+    private final ExecutorService executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
 
     private final ControllerConfig controllerConfig;
     private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
@@ -72,13 +72,13 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
                 long timeoutMillis = next.getValue().heartbeatTimeoutMillis;
                 if ((last + timeoutMillis) < System.currentTimeMillis()) {
                     final Channel channel = next.getValue().channel;
+                    iterator.remove();
                     if (channel != null) {
                         RemotingUtil.closeChannel(channel);
                     }
-                    iterator.remove();
                     this.executor.submit(() ->
-                        notifyBrokerInActive(next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
-                    log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);
+                        notifyBrokerInActive(next.getKey().getClusterName(), next.getValue().brokerName, next.getKey().getBrokerAddr(), next.getValue().brokerId));
+                    log.warn("The broker channel {} expired, brokerInfo {}, expired {}ms", next.getValue().channel, next.getKey(), timeoutMillis);
                 }
             }
         } catch (Exception e) {
@@ -86,9 +86,9 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
         }
     }
 
-    private void notifyBrokerInActive(String brokerName, String brokerAddr, Long brokerId) {
+    private void notifyBrokerInActive(String clusterName, String brokerName, String brokerAddr, Long brokerId) {
         for (BrokerLifecycleListener listener : this.brokerLifecycleListeners) {
-            listener.onBrokerInactive(brokerName, brokerAddr, brokerId);
+            listener.onBrokerInactive(clusterName, brokerName, brokerAddr, brokerId);
         }
     }
 
@@ -112,6 +112,16 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
         }
     }
 
+    @Override
+    public void changeBrokerMetadata(String clusterName, String brokerAddr, Long brokerId) {
+        BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
+        BrokerLiveInfo prev = this.brokerLiveTable.get(addrInfo);
+        if (prev != null) {
+            prev.brokerId = brokerId;
+            log.info("Change broker {}'s brokerId to {}", brokerAddr, brokerId);
+        }
+    }
+
     @Override
     public void onBrokerHeartbeat(String clusterName, String brokerAddr) {
         BrokerAddrInfo addrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
@@ -123,15 +133,19 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
 
     @Override
     public void onBrokerChannelClose(Channel channel) {
-        synchronized (this) {
-            for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
-                if (entry.getValue().channel == channel) {
-                    this.executor.submit(() ->
-                        notifyBrokerInActive(entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
-                    break;
-                }
+        BrokerAddrInfo addrInfo = null;
+        for (Map.Entry<BrokerAddrInfo, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) {
+            if (entry.getValue().channel == channel) {
+                log.info("Channel {} inactive, broker {}, addr:{}, id:{}", entry.getValue().channel, entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId);
+                addrInfo = entry.getKey();
+                this.executor.submit(() ->
+                    notifyBrokerInActive(entry.getKey().getClusterName(), entry.getValue().brokerName, entry.getKey().getBrokerAddr(), entry.getValue().brokerId));
+                break;
             }
         }
+        if (addrInfo != null) {
+            this.brokerLiveTable.remove(addrInfo);
+        }
     }
 
     @Override
@@ -147,9 +161,9 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
 
     static class BrokerLiveInfo {
         private final String brokerName;
-        private final long brokerId;
         private final long heartbeatTimeoutMillis;
         private final Channel channel;
+        private long brokerId;
         private long lastUpdateTimestamp;
 
         public BrokerLiveInfo(String brokerName, long brokerId, long lastUpdateTimestamp, long heartbeatTimeoutMillis,
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index e4b042dae..f0b680f73 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -77,6 +77,15 @@ public class ReplicasInfoManager {
             final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
             final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName);
 
+            // Check whether the oldSyncStateSet is equal with newSyncStateSet
+            final Set<String> oldSyncStateSet = syncStateInfo.getSyncStateSet();
+            if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) {
+                String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet";
+                log.warn("{}", err);
+                result.setCodeAndRemark(ResponseCode.CONTROLLER_INVALID_REQUEST, err);
+                return result;
+            }
+
             // Check master
             if (!syncStateInfo.getMasterAddress().equals(request.getMasterAddress())) {
                 String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 164e9e701..a5ebe37e0 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -115,6 +115,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
             case BROKER_HEARTBEAT: {
                 final BrokerHeartbeatRequestHeader requestHeader = request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
                 this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr());
+                return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
             }
             case CONTROLLER_GET_SYNC_STATE_DATA: {
                 if (request.getBody() != null) {
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 2b8b5e976..80f3a409a 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaI
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
 import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.controller.impl.DLedgerController;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -51,8 +51,9 @@ public class ControllerManagerTest {
     private List<String> baseDirs;
     private List<ControllerManager> controllers;
     private NettyRemotingClient remotingClient;
+    private NettyRemotingClient remotingClient1;
 
-    public ControllerManager launchManager(final String group, final String peers, final String selfId, final int listenPort) {
+    public ControllerManager launchManager(final String group, final String peers, final String selfId) {
         final String path = "/tmp" + File.separator + group + File.separator + selfId;
         baseDirs.add(path);
 
@@ -66,7 +67,6 @@ public class ControllerManagerTest {
         config.setScanNotActiveBrokerInterval(2000L);
 
         final NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(listenPort);
 
         final ControllerManager manager = new ControllerManager(config, serverConfig, new NettyClientConfig());
         manager.initialize();
@@ -81,6 +81,8 @@ public class ControllerManagerTest {
         this.controllers = new ArrayList<>();
         this.remotingClient = new NettyRemotingClient(new NettyClientConfig());
         this.remotingClient.start();
+        this.remotingClient1 = new NettyRemotingClient(new NettyClientConfig());
+        this.remotingClient1.start();
     }
 
     public ControllerManager waitLeader(final List<ControllerManager> controllers) throws Exception {
@@ -105,9 +107,9 @@ public class ControllerManagerTest {
     public void mockData() {
         String group = UUID.randomUUID().toString();
         String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", 30000, 30001, 30002);
-        launchManager(group, peers, "n0", 31000);
-        launchManager(group, peers, "n1", 31001);
-        launchManager(group, peers, "n2", 31002);
+        launchManager(group, peers, "n0");
+        launchManager(group, peers, "n1");
+        launchManager(group, peers, "n2");
     }
 
     /**
@@ -115,13 +117,13 @@ public class ControllerManagerTest {
      */
     public BrokerRegisterResponseHeader registerBroker(
         final String controllerAddress, final String clusterName,
-        final String brokerName, final String address) throws Exception {
+        final String brokerName, final String address, final RemotingClient client) throws Exception {
 
         final BrokerRegisterRequestHeader requestHeader = new BrokerRegisterRequestHeader(clusterName, brokerName, address);
         // Timeout = 3000
         requestHeader.setHeartbeatTimeoutMillis(4000L);
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_REGISTER_BROKER, requestHeader);
-        final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000);
+        final RemotingCommand response = client.invokeSync(controllerAddress, request, 3000);
         assert response != null;
         switch (response.getCode()) {
             case SUCCESS: {
@@ -138,14 +140,14 @@ public class ControllerManagerTest {
     public void testSomeApi() throws Exception {
         mockData();
         final ControllerManager leader = waitLeader(this.controllers);
-        String leaderAddr = RemotingUtil.getLocalAddress() + ":" + leader.getNettyServerConfig().getListenPort();
+        String leaderAddr = "localhost" + ":" + leader.getController().getRemotingServer().localListenPort();
 
         // Register two broker, the first one is master.
-        final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000");
+        final BrokerRegisterResponseHeader responseHeader1 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8000", this.remotingClient);
         assert responseHeader1 != null;
         assertEquals(responseHeader1.getBrokerId(), MixAll.MASTER_ID);
 
-        final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001");
+        final BrokerRegisterResponseHeader responseHeader2 = registerBroker(leaderAddr, "cluster1", "broker1", "127.0.0.1:8001", this.remotingClient1);
         assert responseHeader2 != null;
         assertEquals(responseHeader2.getBrokerId(), 2);
 
@@ -157,8 +159,9 @@ public class ControllerManagerTest {
             heartbeatRequestHeader.setBrokerName("broker1");
             heartbeatRequestHeader.setBrokerAddr("127.0.0.1:8001");
             final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.BROKER_HEARTBEAT, heartbeatRequestHeader);
+            System.out.println("send heartbeat success");
             try {
-                this.remotingClient.invokeOneway(leaderAddr, request, 3000);
+                final RemotingCommand remotingCommand = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -171,7 +174,7 @@ public class ControllerManagerTest {
         // The new master should be broker2.
         final GetReplicaInfoRequestHeader requestHeader = new GetReplicaInfoRequestHeader("broker1");
         final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_REPLICA_INFO, requestHeader);
-        final RemotingCommand response = this.remotingClient.invokeSync(leaderAddr, request, 3000);
+        final RemotingCommand response = this.remotingClient1.invokeSync(leaderAddr, request, 3000);
         final GetReplicaInfoResponseHeader responseHeader = response.decodeCommandCustomHeader(GetReplicaInfoResponseHeader.class);
         assertEquals(responseHeader.getMasterAddress(), "127.0.0.1:8001");
 
@@ -187,5 +190,7 @@ public class ControllerManagerTest {
             System.out.println("Delete file " + dir);
             new File(dir).delete();
         }
+        this.remotingClient.shutdown();
+        this.remotingClient1.shutdown();
     }
 }
diff --git a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index d4de7abe4..8ec99a1e0 100644
--- a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++ b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -40,7 +40,7 @@ public class DefaultBrokerHeartbeatManagerTest {
     @Test
     public void testDetectBrokerAlive() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.heartbeatManager.addBrokerLifecycleListener((brokerName, brokerAddress, brokerId) -> {
+        this.heartbeatManager.addBrokerLifecycleListener((clusterName, brokerName, brokerAddress, brokerId) -> {
             System.out.println("Broker shutdown:" + brokerAddress);
             latch.countDown();
         });
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 00f9215e2..0659e5e2f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -111,8 +111,13 @@ public class NamesrvController {
         this.brokerHousekeepingService = new BrokerHousekeepingService(this);
         this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
         if (controllerConfig.isEnableStartupController()) {
-            this.controller = new DLedgerController(controllerConfig, this.routeInfoManager::isBrokerAlive);
-            this.routeInfoManager.setController(this.controller);
+            try {
+                final NettyServerConfig controllerNettyServerConfig = (NettyServerConfig) nettyServerConfig.clone();
+                this.controller = new DLedgerController(controllerConfig, this.routeInfoManager::isBrokerAlive,
+                    controllerNettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService);
+                this.routeInfoManager.setController(this.controller);
+            } catch (final CloneNotSupportedException ignored) {
+            }
         }
         this.configuration = new Configuration(
             LOGGER,
@@ -270,13 +275,15 @@ public class NamesrvController {
             this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
 
             if (controllerConfig.isEnableStartupController()) {
+                final RemotingServer controllerRemotingServer = this.controller.getRemotingServer();
+                assert controllerRemotingServer != null;
                 final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
-                this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor);
+                controllerRemotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor);
             }
         }
     }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
index 2651636e3..be00134b4 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java
@@ -20,14 +20,12 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.SyncStateSet;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -102,16 +100,7 @@ public class ControllerRequestProcessor implements NettyRequestProcessor {
                 break;
             }
             case CONTROLLER_GET_METADATA_INFO: {
-                final RemotingCommand response = this.controller.getControllerMetadata();
-                final GetMetaDataResponseHeader responseHeader = (GetMetaDataResponseHeader) response.readCustomHeader();
-                if (StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) {
-                    final String leaderAddress = responseHeader.getControllerLeaderAddress();
-                    // Because the controller is proxy by namesrv, so we should replace the controllerAddress to namesrvAddress.
-                    final int splitIndex = StringUtils.lastIndexOf(leaderAddress, ":");
-                    final String namesrvAddress = leaderAddress.substring(0, splitIndex + 1) + this.namesrvController.getNettyServerConfig().getListenPort();
-                    responseHeader.setControllerLeaderAddress(namesrvAddress);
-                }
-                return response;
+                return this.controller.getControllerMetadata();
             }
             case CONTROLLER_GET_SYNC_STATE_DATA: {
                 if (request.getBody() != null) {
diff --git a/pom.xml b/pom.xml
index 9c72fffb0..7505ce295 100644
--- a/pom.xml
+++ b/pom.xml
@@ -537,7 +537,7 @@
             <dependency>
                 <groupId>io.openmessaging.storage</groupId>
                 <artifactId>dledger</artifactId>
-                <version>0.2.5</version>
+                <version>0.2.6</version>
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 38758d663..de321e85a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -142,16 +142,6 @@ public class AutoSwitchHAService extends DefaultHAService {
     @Override public void updateMasterAddress(String newAddr) {
     }
 
-    @Override public void removeConnection(HAConnection conn) {
-        final Set<String> syncStateSet = getSyncStateSet();
-        String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
-        if (syncStateSet.contains(slave)) {
-            syncStateSet.remove(slave);
-            notifySyncStateSetChanged(syncStateSet);
-        }
-        super.removeConnection(conn);
-    }
-
     public void registerSyncStateSetChangedListener(final Consumer<Set<String>> listener) {
         this.syncStateSetChangedListeners.add(listener);
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 38172ca3d..ea1c18bcd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -73,13 +73,13 @@ public class AutoSwitchRoleBase {
         return PORT_COUNTER.addAndGet(10 + random.nextInt(10));
     }
 
-    public BrokerController startBroker(String namesrvAddress, int brokerId, int haPort, int brokerListenPort,
+    public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
         int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
         final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
         final BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setListenPort(brokerListenPort);
         brokerConfig.setNamesrvAddr(namesrvAddress);
-        brokerConfig.setControllerAddr(namesrvAddress);
+        brokerConfig.setControllerAddr(controllerAddress);
         brokerConfig.setReplicasManagerSyncBrokerMetadataPeriod(2 * 1000);
         brokerConfig.setReplicasManagerCheckSyncStateSetPeriod(4 * 1000);
         brokerConfig.setEnableControllerMode(true);
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index bdf23ecbd..1d4026018 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -47,6 +47,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
     private ControllerConfig controllerConfig;
     private NamesrvController namesrvController;
     private String namesrvAddress;
+    private String controllerAddress;
     private BrokerController brokerController1;
     private BrokerController brokerController2;
 
@@ -54,20 +55,22 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         super.initialize();
 
         // Startup namesrv
-        final String peers = String.format("n0-localhost:%d", nextPort());
+        int controllerPort = nextPort();
+        final String peers = String.format("n0-localhost:%d", controllerPort);
+
         final NettyServerConfig serverConfig = new NettyServerConfig();
         int namesrvPort = nextPort();
         serverConfig.setListenPort(namesrvPort);
-        System.out.println(namesrvPort);
 
         this.controllerConfig = buildControllerConfig("n0", peers);
         this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig(), controllerConfig);
         assertTrue(namesrvController.initialize());
         namesrvController.start();
         this.namesrvAddress = "127.0.0.1:" + namesrvPort + ";";
+        this.controllerAddress = "127.0.0.1:" + controllerPort + ";";
 
-        this.brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
-        this.brokerController2 = startBroker(this.namesrvAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
+        this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
+        this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
 
         // Wait slave connecting to master
         assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
@@ -77,7 +80,6 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         System.out.println("Begin test");
         final MessageStore messageStore = brokerController1.getMessageStore();
         putMessage(messageStore);
-
         // Check slave message
         checkMessage(brokerController2.getMessageStore(), 10, 0);
     }
@@ -90,7 +92,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
                 return true;
             } else {
                 System.out.println("slave not ready");
-                Thread.sleep(1000);
+                Thread.sleep(2000);
                 tryTimes++;
             }
         }
@@ -123,7 +125,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
 
         // Restart old master, it should be slave
-        brokerController1 = startBroker(this.namesrvAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+        brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
         waitSlaveReady(brokerController1.getMessageStore());
 
         assertFalse(brokerController1.getReplicasManager().isMasterState());
@@ -142,7 +144,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         init(defaultFileSize);
         mockData();
 
-        BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+        BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
         waitSlaveReady(broker3.getMessageStore());
         Thread.sleep(6000);
 
@@ -169,7 +171,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
 
         // Step3: add broker3
-        BrokerController broker3 = startBroker(this.namesrvAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+        BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
         waitSlaveReady(broker3.getMessageStore());
         Thread.sleep(6000);
         checkMessage(broker3.getMessageStore(), 10, 0);
@@ -197,7 +199,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         checkMessage(broker2MessageStore, 10, 10);
 
         // Step6, start broker4, link to broker2, it should sync msg from epoch2(offset = 1700).
-        BrokerController broker4 = startBroker(this.namesrvAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+        BrokerController broker4 = startBroker(this.namesrvAddress, this.controllerAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
         waitSlaveReady(broker4.getMessageStore());
         Thread.sleep(6000);
         checkMessage(broker4.getMessageStore(), 10, 10);