You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/05/06 02:21:21 UTC

[rocketmq] branch 5.0.0-beta updated: [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new a4cccea9d [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
a4cccea9d is described below

commit a4cccea9db284e92812b2c9f17043a8b4ce589d8
Author: rongtong <ji...@163.com>
AuthorDate: Fri May 6 10:21:14 2022 +0800

    [ISSUE#4233] Move the capability of slaveActingMaster from container module to broker module
---
 .../apache/rocketmq/broker/BrokerController.java   | 384 +++++++++++++++++----
 .../rocketmq/broker}/BrokerPreOnlineService.java   |   8 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  10 +
 .../rocketmq/container/BrokerContainerConfig.java  |   9 -
 .../rocketmq/container/InnerBrokerController.java  | 204 +----------
 .../container/InnerSalveBrokerController.java      | 114 ------
 .../rocketmq/container/BrokerContainerTest.java    |  18 +-
 .../rocketmq/container/BrokerPreOnlineTest.java    |   4 +-
 .../container/ContainerIntegrationTestBase.java    |  10 +-
 9 files changed, 368 insertions(+), 393 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index bab68c481..7814c4feb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -33,8 +34,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -97,8 +101,10 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
 import org.apache.rocketmq.broker.util.HookUtils;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.BrokerSyncInfo;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -182,6 +188,8 @@ public class BrokerController {
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
     protected BrokerOuterAPI brokerOuterAPI;
     protected ScheduledExecutorService scheduledExecutorService;
+    protected ScheduledExecutorService syncBrokerMemberGroupExecutorService;
+    protected ScheduledExecutorService brokerHeartbeatExecutorService;
     protected final SlaveSynchronize slaveSynchronize;
     protected final BlockingQueue<Runnable> sendThreadPoolQueue;
     protected final BlockingQueue<Runnable> putThreadPoolQueue;
@@ -238,6 +246,12 @@ public class BrokerController {
     protected EscapeBridge escapeBridge;
     protected List<BrokerAttachedPlugin> brokerAttachedPlugins = new ArrayList<>();
     protected volatile long shouldStartTime;
+    private BrokerPreOnlineService brokerPreOnlineService;
+    protected volatile boolean isIsolated = false;
+    protected volatile long minBrokerIdInGroup = 0;
+    protected volatile String minBrokerAddrInGroup = null;
+    private final Lock lock = new ReentrantLock();
+    protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
 
     public BrokerController(
         final BrokerConfig brokerConfig,
@@ -361,6 +375,10 @@ public class BrokerController {
         this.brokerMemberGroup.getBrokerAddrs().put(this.brokerConfig.getBrokerId(), this.getBrokerAddr());
 
         this.escapeBridge = new EscapeBridge(this);
+
+        if (!this.brokerConfig.isSkipPreOnline()) {
+            this.brokerPreOnlineService = new BrokerPreOnlineService(this);
+        }
     }
 
     public BrokerConfig getBrokerConfig() {
@@ -501,6 +519,11 @@ public class BrokerController {
             this.loadBalanceThreadPoolQueue,
             new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
 
+        this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
+        this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
+
         this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
     }
 
@@ -1273,6 +1296,13 @@ public class BrokerController {
             escapeBridge.shutdown();
         }
 
+        if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
+            this.brokerPreOnlineService.shutdown();
+        }
+
+        shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
+        shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
+
         this.topicConfigManager.persist();
         this.subscriptionGroupManager.persist();
 
@@ -1287,6 +1317,10 @@ public class BrokerController {
 
         shutdownBasicService();
 
+        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            scheduledFuture.cancel(true);
+        }
+
         if (this.brokerOuterAPI != null) {
             this.brokerOuterAPI.shutdown();
         }
@@ -1385,6 +1419,10 @@ public class BrokerController {
             this.escapeBridge.start();
         }
 
+        if (this.brokerPreOnlineService != null) {
+            this.brokerPreOnlineService.start();
+        }
+
         //Init state version after messageStore initialized.
         this.topicConfigManager.initStateVersion();
     }
@@ -1393,33 +1431,70 @@ public class BrokerController {
 
         this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
 
+        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
+            isIsolated = true;
+        }
+
         if (this.brokerOuterAPI != null) {
             this.brokerOuterAPI.start();
         }
 
         startBasicService();
 
-        if (!this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
             changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
         }
 
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
+        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
             @Override
-            public void run() {
+            public void run2() {
                 try {
                     if (System.currentTimeMillis() < shouldStartTime) {
-                        LOG.info("Register to namesrv after {}", shouldStartTime);
+                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
+                        return;
+                    }
+                    if (isIsolated) {
+                        BrokerController.LOG.info("Skip register for broker is isolated");
                         return;
                     }
                     BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {
-                    LOG.error("BrokerController#registerBrokerAll: unexpected error occurs.", e);
+                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                 }
             }
-        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
+
+        if (this.brokerConfig.isEnableSlaveActingMaster()) {
+            scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
+                @Override
+                public void run2() {
+                    if (isIsolated) {
+                        return;
+                    }
+                    try {
+                        BrokerController.this.sendHeartbeat();
+                    } catch (Exception e) {
+                        BrokerController.LOG.error("sendHeartbeat Exception", e);
+                    }
+
+                }
+            }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
 
+            scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
+                @Override public void run2() {
+                    try {
+                        BrokerController.this.syncBrokerMemberGroup();
+                    } catch (Throwable e) {
+                        BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
+                    }
+                }
+            }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
+        }
+
+        if (brokerConfig.isSkipPreOnline()) {
+            startServiceWithoutCondition();
+        }
     }
 
     public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
@@ -1503,10 +1578,10 @@ public class BrokerController {
         TopicConfigSerializeWrapper topicConfigWrapper) {
 
         if (shutdown) {
-            LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
+            BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
             return;
         }
-        List<RegisterBrokerResult> registerBrokerResultList = this.getBrokerOuterAPI().registerBrokerAll(
+        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
             this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
@@ -1518,11 +1593,61 @@ public class BrokerController {
             this.brokerConfig.getRegisterBrokerTimeoutMills(),
             this.brokerConfig.isEnableSlaveActingMaster(),
             this.brokerConfig.isCompressedRegister(),
+            this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
             this.getBrokerIdentity());
 
         handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
     }
 
+    protected void sendHeartbeat() {
+        if (this.brokerConfig.isCompatibleWithOldNameSrv()) {
+            this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                this.getTopicConfigManager().getDataVersion(),
+                this.brokerConfig.isInBrokerContainer());
+        } else {
+            this.brokerOuterAPI.sendHeartbeat(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                this.brokerConfig.isInBrokerContainer());
+        }
+    }
+
+    protected void syncBrokerMemberGroup() {
+        try {
+            brokerMemberGroup = this.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerConfig.isCompatibleWithOldNameSrv());
+        } catch (Exception e) {
+            BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
+            return;
+        }
+        if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
+            BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
+            return;
+        }
+        this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
+
+        if (!this.isIsolated) {
+            long minBrokerId = brokerMemberGroup.minimumBrokerId();
+            this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+        }
+    }
+
+    private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
+        if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
+            return brokerAddrTable.size();
+        } else {
+            return brokerAddrTable.size() + 1;
+        }
+    }
+
     protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
         boolean checkOrderConfig) {
         for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
@@ -1570,6 +1695,189 @@ public class BrokerController {
         return null;
     }
 
+    public void startService(long minBrokerId, String minBrokerAddr) {
+        BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
+            this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
+        this.minBrokerIdInGroup = minBrokerId;
+        this.minBrokerAddrInGroup = minBrokerAddr;
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
+        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+        isIsolated = false;
+    }
+
+    public void startServiceWithoutCondition() {
+        BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+        isIsolated = false;
+    }
+
+    public void stopService() {
+        BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
+        isIsolated = true;
+        this.changeSpecialServiceStatus(false);
+    }
+
+    public boolean isSpecialServiceRunning() {
+        if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
+            return true;
+        }
+
+        return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
+    }
+
+    private void onMasterOffline() {
+        // close channels with master broker
+        String masterAddr = this.slaveSynchronize.getMasterAddr();
+        if (masterAddr != null) {
+            this.brokerOuterAPI.getRemotingClient().closeChannels(
+                Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
+        }
+        // master not available, stop sync
+        this.slaveSynchronize.setMasterAddr(null);
+        this.messageStore.updateHaMasterAddress(null);
+    }
+
+    private void onMasterOnline(String masterAddr, String masterHaAddr) {
+        boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
+            && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
+        if (masterHaAddr == null || needSyncMasterFlushOffset) {
+            try {
+                BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
+
+                if (needSyncMasterFlushOffset) {
+                    LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+                    this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+                }
+
+                if (masterHaAddr == null) {
+                    this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+                    this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
+                }
+            } catch (Exception e) {
+                LOG.error("retrieve master ha info exception, {}", e);
+            }
+        }
+
+        // set master HA address.
+        if (masterHaAddr != null) {
+            this.messageStore.updateHaMasterAddress(masterHaAddr);
+        }
+
+        // wakeup HAClient
+        this.messageStore.wakeupHAClient();
+    }
+
+    private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+        String masterHaAddr) {
+        LOG.info("Min broker changed, old: {}-{}, new {}-{}",
+            this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
+
+        this.minBrokerIdInGroup = minBrokerId;
+        this.minBrokerAddrInGroup = minBrokerAddr;
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
+
+        if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
+            // master offline
+            onMasterOffline();
+        }
+
+        if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
+            // master online
+            onMasterOnline(minBrokerAddr, masterHaAddr);
+        }
+
+        // notify PullRequest on hold to pull from master.
+        if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
+            this.pullRequestHoldService.notifyMasterOnline();
+        }
+    }
+
+    public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
+        if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
+            if (lock.tryLock()) {
+                try {
+                    if (minBrokerId != this.minBrokerIdInGroup) {
+                        String offlineBrokerAddr = null;
+                        if (minBrokerId > this.minBrokerIdInGroup) {
+                            offlineBrokerAddr = this.minBrokerAddrInGroup;
+                        }
+                        onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+    }
+
+    public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+        String masterHaAddr) {
+        if (brokerConfig.isEnableSlaveActingMaster() && brokerConfig.getBrokerId() != MixAll.MASTER_ID) {
+            try {
+                if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+                    try {
+                        if (minBrokerId != this.minBrokerIdInGroup) {
+                            onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
+                        }
+                    } finally {
+                        lock.unlock();
+                    }
+
+                }
+            } catch (InterruptedException e) {
+                LOG.error("Update min broker error, {}", e);
+            }
+        }
+    }
+
+    public void changeSpecialServiceStatus(boolean shouldStart) {
+
+        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
+            if (brokerAttachedPlugin != null) {
+                brokerAttachedPlugin.statusChanged(shouldStart);
+            }
+        }
+
+        changeScheduleServiceStatus(shouldStart);
+
+        changeTransactionCheckServiceStatus(shouldStart);
+
+        if (this.ackMessageProcessor != null) {
+            LOG.info("Set PopReviveService Status to {}", shouldStart);
+            this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
+        }
+    }
+
+    private synchronized void changeTransactionCheckServiceStatus(boolean shouldStart) {
+        if (isTransactionCheckServiceStart != shouldStart) {
+            LOG.info("TransactionCheckService status changed to {}", shouldStart);
+            if (shouldStart) {
+                this.transactionalMessageCheckService.start();
+            } else {
+                this.transactionalMessageCheckService.shutdown(true);
+            }
+            isTransactionCheckServiceStart = shouldStart;
+        }
+    }
+
+    public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
+        if (isScheduleServiceStart != shouldStart) {
+            LOG.info("ScheduleServiceStatus changed to {}", shouldStart);
+            if (shouldStart) {
+                this.scheduleMessageService.start();
+            } else {
+                this.scheduleMessageService.stop();
+            }
+            isScheduleServiceStart = shouldStart;
+        }
+    }
+
     public TopicConfigManager getTopicConfigManager() {
         return topicConfigManager;
     }
@@ -1772,18 +2080,6 @@ public class BrokerController {
         return this.brokerMemberGroup;
     }
 
-    public boolean isSpecialServiceRunning() {
-        return this.brokerConfig.getBrokerId() == MixAll.MASTER_ID;
-    }
-
-    public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
-        //do nothing
-    }
-
-    public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr, String masterHaAddr) {
-        //do nothing
-    }
-
     public int getListenPort() {
         return this.nettyServerConfig.getListenPort();
     }
@@ -1800,46 +2096,8 @@ public class BrokerController {
         return shouldStartTime;
     }
 
-    public void changeSpecialServiceStatus(boolean shouldStart) {
-
-        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
-            if (brokerAttachedPlugin != null) {
-                brokerAttachedPlugin.statusChanged(shouldStart);
-            }
-        }
-
-        changeScheduleServiceStatus(shouldStart);
-
-        changeTransactionCheckServiceStatus(shouldStart);
-
-        if (this.ackMessageProcessor != null) {
-            LOG.info("Set PopReviveService Status to {}", shouldStart);
-            this.ackMessageProcessor.setPopReviveServiceStatus(shouldStart);
-        }
-    }
-
-    private synchronized void changeTransactionCheckServiceStatus(boolean shouldStart) {
-        if (isTransactionCheckServiceStart != shouldStart) {
-            LOG.info("TransactionCheckService status changed to {}", shouldStart);
-            if (shouldStart) {
-                this.transactionalMessageCheckService.start();
-            } else {
-                this.transactionalMessageCheckService.shutdown(true);
-            }
-            isTransactionCheckServiceStart = shouldStart;
-        }
-    }
-
-    public synchronized void changeScheduleServiceStatus(boolean shouldStart) {
-        if (isScheduleServiceStart != shouldStart) {
-            LOG.info("ScheduleServiceStatus changed to {}", shouldStart);
-            if (shouldStart) {
-                this.scheduleMessageService.start();
-            } else {
-                this.scheduleMessageService.stop();
-            }
-            isScheduleServiceStart = shouldStart;
-        }
+    public BrokerPreOnlineService getBrokerPreOnlineService() {
+        return brokerPreOnlineService;
     }
 
     public boolean isScheduleServiceStart() {
@@ -1872,4 +2130,8 @@ public class BrokerController {
                 brokerConfig.getBrokerId(), brokerConfig.isInBrokerContainer());
         }
     }
+
+    public boolean isIsolated() {
+        return this.isIsolated;
+    }
 }
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
similarity index 97%
rename from container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
rename to broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
index c85eb654e..10757c0db 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPreOnlineService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.container;
+package org.apache.rocketmq.broker;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,11 +42,11 @@ import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
 
 public class BrokerPreOnlineService extends ServiceThread {
     private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final InnerBrokerController brokerController;
+    private final BrokerController brokerController;
 
     private int waitBrokerIndex = 0;
 
-    public BrokerPreOnlineService(InnerBrokerController brokerController) {
+    public BrokerPreOnlineService(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
@@ -241,7 +241,7 @@ public class BrokerPreOnlineService extends ServiceThread {
             brokerMemberGroup = this.brokerController.getBrokerOuterAPI().syncBrokerMemberGroup(
                 this.brokerController.getBrokerConfig().getBrokerClusterName(),
                 this.brokerController.getBrokerConfig().getBrokerName(),
-                this.brokerController.getBrokerContainer().getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+                this.brokerController.getBrokerConfig().isCompatibleWithOldNameSrv());
         } catch (Exception e) {
             LOGGER.error("syncBrokerMemberGroup from namesrv error, start service failed, will try later, ", e);
             return false;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index efd4ec90a..71280ea0b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -295,6 +295,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private String metaDataHosts = "";
 
+    private boolean compatibleWithOldNameSrv = true;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1270,4 +1272,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setIsolateLogEnable(boolean isolateLogEnable) {
         this.isolateLogEnable = isolateLogEnable;
     }
+
+    public boolean isCompatibleWithOldNameSrv() {
+        return compatibleWithOldNameSrv;
+    }
+
+    public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
+        this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
+    }
 }
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
index 28a524254..564cd37ab 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -36,8 +36,6 @@ public class BrokerContainerConfig {
 
     private String brokerConfigPaths = null;
 
-    private boolean compatibleWithOldNameSrv = true;
-
     public String getRocketmqHome() {
         return rocketmqHome;
     }
@@ -74,11 +72,4 @@ public class BrokerContainerConfig {
         this.brokerConfigPaths = brokerConfigPaths;
     }
 
-    public boolean isCompatibleWithOldNameSrv() {
-        return compatibleWithOldNameSrv;
-    }
-
-    public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
-        this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
-    }
 }
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
index 1f0d0fe91..2efa939fa 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -16,36 +16,20 @@
  */
 package org.apache.rocketmq.container;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
 public class InnerBrokerController extends BrokerController {
-    private ScheduledExecutorService syncBrokerMemberGroupExecutorService;
-    private ScheduledExecutorService brokerHeartbeatExecutorService;
-    protected volatile long minBrokerIdInGroup = 0;
-    protected volatile String minBrokerAddrInGroup = null;
-    protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
-    private BrokerPreOnlineService brokerPreOnlineService;
     protected BrokerContainer brokerContainer;
-    protected volatile boolean isIsolated = false;
 
     public InnerBrokerController(
         final BrokerContainer brokerContainer,
@@ -55,10 +39,6 @@ public class InnerBrokerController extends BrokerController {
         super(brokerConfig, messageStoreConfig);
         this.brokerContainer = brokerContainer;
         this.brokerOuterAPI = this.brokerContainer.getBrokerOuterAPI();
-
-        if (!this.brokerConfig.isSkipPreOnline()) {
-            this.brokerPreOnlineService = new BrokerPreOnlineService(this);
-        }
     }
 
     @Override
@@ -67,70 +47,23 @@ public class InnerBrokerController extends BrokerController {
         this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
     }
 
-    /**
-     * Initialize resources for master which will be re-used by slave.
-     */
-    @Override
-    protected void initializeResources() {
-        super.initializeResources();
-        this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", super.getBrokerIdentity()));
-        this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", super.getBrokerIdentity()));
-    }
-
     @Override
     protected void initializeScheduledTasks() {
         initializeBrokerScheduledTasks();
     }
 
-    @Override
-    public void shutdown() {
-
-        shutdownBasicService();
-
-        if (this.remotingServer != null) {
-            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
-        }
-
-        if (this.fastRemotingServer != null) {
-            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
-        }
-
-        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
-            scheduledFuture.cancel(true);
-        }
-
-        if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
-            this.brokerPreOnlineService.shutdown();
-        }
-
-        shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
-        shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
-
-    }
-
-    @Override
-    public String getBrokerAddr() {
-        return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
-    }
-
     @Override
     public void start() throws Exception {
-
         this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
 
-        if (messageStoreConfig.getTotalReplicas() > 1) {
+        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster()) {
             isIsolated = true;
         }
 
         startBasicService();
 
-        if (this.brokerPreOnlineService != null) {
-            this.brokerPreOnlineService.start();
-        }
-
-        if (!isIsolated) {
+        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
             this.registerBrokerAll(true, false, true);
         }
 
@@ -178,12 +111,6 @@ public class InnerBrokerController extends BrokerController {
                     }
                 }
             }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
-
-        }
-
-        if (!isIsolated && !messageStoreConfig.isEnableDLegerCommitLog()
-            && !messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isEnableSlaveActingMaster()) {
-            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
         }
 
         if (brokerConfig.isSkipPreOnline()) {
@@ -191,79 +118,27 @@ public class InnerBrokerController extends BrokerController {
         }
     }
 
-    private void sendHeartbeat() {
-        if (this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv()) {
-            this.brokerOuterAPI.sendHeartbeatViaDataVersion(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                this.getTopicConfigManager().getDataVersion(),
-                this.brokerConfig.isInBrokerContainer());
-        } else {
-            this.brokerOuterAPI.sendHeartbeat(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
-                this.brokerConfig.isInBrokerContainer());
-        }
-    }
+    @Override
+    public void shutdown() {
 
-    public void syncBrokerMemberGroup() {
-        try {
-            brokerMemberGroup = this.brokerContainer.getBrokerOuterAPI()
-                .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv());
-        } catch (Exception e) {
-            BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
-            return;
-        }
-        if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
-            BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
-            return;
+        shutdownBasicService();
+
+        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            scheduledFuture.cancel(true);
         }
-        this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
 
-        if (!this.isIsolated) {
-            long minBrokerId = brokerMemberGroup.minimumBrokerId();
-            this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+        if (this.remotingServer != null) {
+            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
         }
-    }
 
-    private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
-        if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
-            return brokerAddrTable.size();
-        } else {
-            return brokerAddrTable.size() + 1;
+        if (this.fastRemotingServer != null) {
+            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
         }
     }
 
     @Override
-    protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
-        TopicConfigSerializeWrapper topicConfigWrapper) {
-
-        if (shutdown) {
-            BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
-            return;
-        }
-        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills(),
-            this.brokerConfig.isEnableSlaveActingMaster(),
-            this.brokerConfig.isCompressedRegister(),
-            this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
-            this.getBrokerIdentity());
-
-        handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
+    public String getBrokerAddr() {
+        return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
     }
 
     @Override
@@ -287,15 +162,6 @@ public class InnerBrokerController extends BrokerController {
         return this.minBrokerIdInGroup;
     }
 
-    @Override
-    public boolean isSpecialServiceRunning() {
-        if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
-            return true;
-        }
-
-        return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
-    }
-
     @Override
     public int getListenPort() {
         return this.brokerConfig.getListenPort();
@@ -305,46 +171,10 @@ public class InnerBrokerController extends BrokerController {
         return brokerContainer.getBrokerOuterAPI();
     }
 
-    public void startService(long minBrokerId, String minBrokerAddr) {
-        BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
-            this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
-        this.minBrokerIdInGroup = minBrokerId;
-        this.minBrokerAddrInGroup = minBrokerAddr;
-
-        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
-        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
-
-        isIsolated = false;
-    }
-
-    public void startServiceWithoutCondition() {
-        BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
-
-        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
-        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
-
-        isIsolated = false;
-    }
-
-    public void stopService() {
-        BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
-        isIsolated = true;
-        this.changeSpecialServiceStatus(false);
-        this.closeChannels();
-    }
-
-    public synchronized void closeChannels() {
-        this.brokerContainer.getBrokerOuterAPI().getRemotingClient().closeChannels();
-    }
-
     public BrokerContainer getBrokerContainer() {
         return this.brokerContainer;
     }
 
-    public boolean isIsolated() {
-        return this.isIsolated;
-    }
-
     public NettyServerConfig getNettyServerConfig() {
         return brokerContainer.getNettyServerConfig();
     }
@@ -371,8 +201,4 @@ public class InnerBrokerController extends BrokerController {
         }
         return this.brokerContainer.peekMasterBroker();
     }
-
-    public BrokerPreOnlineService getBrokerPreOnlineService() {
-        return brokerPreOnlineService;
-    }
 }
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
index e00242a93..a7901bc7d 100644
--- a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
@@ -19,14 +19,10 @@ package org.apache.rocketmq.container;
 
 import com.google.common.base.Preconditions;
 
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.BrokerSyncInfo;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
@@ -47,114 +43,4 @@ public class InnerSalveBrokerController extends InnerBrokerController {
         Preconditions.checkNotNull(brokerConfig.getBrokerName());
         Preconditions.checkArgument(brokerConfig.getBrokerId() != MixAll.MASTER_ID);
     }
-
-    private void onMasterOffline() {
-        // close channels with master broker
-        String masterAddr = this.slaveSynchronize.getMasterAddr();
-        if (masterAddr != null) {
-            this.brokerOuterAPI.getRemotingClient().closeChannels(
-                Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
-        }
-        // master not available, stop sync
-        this.slaveSynchronize.setMasterAddr(null);
-        this.messageStore.updateHaMasterAddress(null);
-    }
-
-    private void onMasterOnline(String masterAddr, String masterHaAddr) {
-        boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
-            && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
-        if (masterHaAddr == null || needSyncMasterFlushOffset) {
-            try {
-                BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
-
-                if (needSyncMasterFlushOffset) {
-                    LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
-                    this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
-                }
-
-                if (masterHaAddr == null) {
-                    this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
-                    this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
-                }
-            } catch (Exception e) {
-                LOG.error("retrieve master ha info exception, {}", e);
-            }
-        }
-
-        // set master HA address.
-        if (masterHaAddr != null) {
-            this.messageStore.updateHaMasterAddress(masterHaAddr);
-        }
-
-        // wakeup HAClient
-        this.messageStore.wakeupHAClient();
-    }
-
-    private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
-        String masterHaAddr) {
-        LOG.info("Min broker changed, old: {}-{}, new {}-{}",
-            this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
-
-        this.minBrokerIdInGroup = minBrokerId;
-        this.minBrokerAddrInGroup = minBrokerAddr;
-
-        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
-
-        if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
-            // master offline
-            onMasterOffline();
-        }
-
-        if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
-            // master online
-            onMasterOnline(minBrokerAddr, masterHaAddr);
-        }
-
-        // notify PullRequest on hold to pull from master.
-        if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
-            this.pullRequestHoldService.notifyMasterOnline();
-        }
-    }
-
-    @Override
-    public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
-        if (lock.tryLock()) {
-            try {
-                if (minBrokerId != this.minBrokerIdInGroup) {
-                    String offlineBrokerAddr = null;
-                    if (minBrokerId > this.minBrokerIdInGroup) {
-                        offlineBrokerAddr = this.minBrokerAddrInGroup;
-                    }
-                    onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
-                }
-            } finally {
-                lock.unlock();
-            }
-
-        }
-    }
-
-    @Override
-    public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
-        String masterHaAddr) {
-        try {
-            if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
-                try {
-                    if (minBrokerId != this.minBrokerIdInGroup) {
-                        onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
-                    }
-                } finally {
-                    lock.unlock();
-                }
-
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Update min broker error, {}", e);
-        }
-    }
-
-    @Override
-    public BrokerController peekMasterBroker() {
-        return this.brokerContainer.peekMasterBroker();
-    }
 }
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
index 351d88def..73354af88 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -76,13 +76,13 @@ public class BrokerContainerTest {
             new NettyClientConfig(),
             new MessageStoreConfig());
 
+        brokerController.getBrokerConfig().setEnableSlaveActingMaster(true);
+
         BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
         Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
         field.setAccessible(true);
         field.set(brokerController, brokerOuterAPI);
 
-        // topic-0 doesn't have queueGroupConfig.
-        // topic-1 has queueGroupConfig.
         List<TopicConfig> topicConfigList = new ArrayList<>(2);
         for (int i = 0; i < 2; i++) {
             topicConfigList.add(new TopicConfig("topic-" + i));
@@ -108,13 +108,13 @@ public class BrokerContainerTest {
             new NettyClientConfig(),
             new MessageStoreConfig());
 
+        brokerController.getBrokerConfig().setEnableSlaveActingMaster(true);
+
         BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
         Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
         field.setAccessible(true);
         field.set(brokerController, brokerOuterAPI);
 
-        // topic-0 doesn't have queueGroupConfig.
-        // topic-1 has queueGroupConfig.
         List<TopicConfig> topicConfigList = new ArrayList<>(2);
         for (int i = 0; i < 2; i++) {
             topicConfigList.add(new TopicConfig("topic-" + i));
@@ -128,7 +128,7 @@ public class BrokerContainerTest {
         ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
         ArgumentCaptor<BrokerIdentity> brokerIdentityCaptor = ArgumentCaptor.forClass(BrokerIdentity.class);
         verify(brokerOuterAPI).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(), anyString(),
-            captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), brokerIdentityCaptor.capture());
+            captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyLong(), brokerIdentityCaptor.capture());
         TopicConfigSerializeWrapper wrapper = captor.getValue();
         for (Map.Entry<String, TopicConfig> entry : wrapper.getTopicConfigTable().entrySet()) {
             assertThat(entry.getValue().getPerm()).isEqualTo(brokerController.getBrokerConfig().getBrokerPermission());
@@ -245,9 +245,9 @@ public class BrokerContainerTest {
     @Test
     public void testAddAndRemoveDLedgerBroker() throws Exception {
         BrokerContainer brokerContainer = new BrokerContainer(
-                new BrokerContainerConfig(),
-                new NettyServerConfig(),
-                new NettyClientConfig());
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
         assertThat(brokerContainer.initialize()).isTrue();
         brokerContainer.start();
 
@@ -354,7 +354,7 @@ public class BrokerContainerTest {
         ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
         ArgumentCaptor<BrokerIdentity> brokerIdentityCaptor = ArgumentCaptor.forClass(BrokerIdentity.class);
         verify(brokerOuterAPI, times(times)).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(),
-            anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(),  brokerIdentityCaptor.capture());
+            anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyLong(), brokerIdentityCaptor.capture());
         TopicConfigSerializeWrapper wrapper = captor.getValue();
 
         for (TopicConfig topicConfig : topicConfigList) {
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
index 6943d28a9..d9258bbb1 100644
--- a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPreOnlineService;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -50,7 +51,6 @@ public class BrokerPreOnlineTest {
 
     public void init() throws Exception {
         when(brokerContainer.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
-        when(brokerContainer.getBrokerContainerConfig()).thenReturn(new BrokerContainerConfig());
 
         BrokerMemberGroup brokerMemberGroup1 = new BrokerMemberGroup();
         Map<Long, String> brokerAddrMap = new HashMap<>();
@@ -81,7 +81,7 @@ public class BrokerPreOnlineTest {
 
         innerBrokerController.setTransactionalMessageCheckService(new TransactionalMessageCheckService(innerBrokerController));
 
-        Field field = InnerBrokerController.class.getDeclaredField("isIsolated");
+        Field field = BrokerController.class.getDeclaredField("isIsolated");
         field.setAccessible(true);
         field.set(innerBrokerController, true);
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
index fab4c46b8..4a2e06642 100644
--- a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
@@ -314,7 +314,6 @@ public class ContainerIntegrationTestBase {
         NettyServerConfig nettyServerConfig = new NettyServerConfig();
         NettyClientConfig nettyClientConfig = new NettyClientConfig();
         brokerContainerConfig.setNamesrvAddr(nsAddr);
-        brokerContainerConfig.setCompatibleWithOldNameSrv(false);
 
         nettyServerConfig.setListenPort(generatePort(20000, 10000));
         BrokerContainer brokerContainer = new BrokerContainer(brokerContainerConfig, nettyServerConfig, nettyClientConfig);
@@ -356,6 +355,7 @@ public class ContainerIntegrationTestBase {
         brokerConfig.setLockInStrictMode(true);
         brokerConfig.setConsumerOffsetUpdateVersionStep(10);
         brokerConfig.setDelayOffsetUpdateVersionStep(10);
+        brokerConfig.setCompatibleWithOldNameSrv(false);
         brokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
 
         String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
@@ -426,7 +426,7 @@ public class ContainerIntegrationTestBase {
         slaveBrokerConfig.setBrokerName(master.getBrokerConfig().getBrokerName());
         slaveBrokerConfig.setBrokerId(slaveBrokerId);
         slaveBrokerConfig.setBrokerClusterName(master.getBrokerConfig().getBrokerClusterName());
-
+        slaveBrokerConfig.setCompatibleWithOldNameSrv(false);
         slaveBrokerConfig.setBrokerIP1("127.0.0.1");
         slaveBrokerConfig.setBrokerIP2("127.0.0.1");
         slaveBrokerConfig.setEnablePropertyFilter(true);
@@ -564,9 +564,9 @@ public class ContainerIntegrationTestBase {
     }
 
     protected static void changeCompatibleMode(boolean compatibleMode) {
-        brokerContainer1.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
-        brokerContainer2.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
-        brokerContainer3.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+        brokerContainer1.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
+        brokerContainer2.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
+        brokerContainer3.getBrokerControllers().forEach(brokerController -> brokerController.getBrokerConfig().setCompatibleWithOldNameSrv(compatibleMode));
     }
 
     protected static Set<MessageQueue> filterMessageQueue(Set<MessageQueue> mqSet, String topic) {