You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/14 06:06:01 UTC

[rocketmq] branch 5.0.0-beta-tmp updated: feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-tmp by this push:
     new a48d286  feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)
a48d286 is described below

commit a48d2869b89d86d6b9c267cf7a250b43f693047a
Author: rongtong <ji...@163.com>
AuthorDate: Mon Mar 14 14:05:53 2022 +0800

    feature(container&remoting):[RIP-31][PART-A]Support RocketMQ BrokerContainer (#3976)
    
    Co-authored-by: rongtong.jrt <ro...@alibaba-inc.com>
---
 container/pom.xml                                  |  36 ++
 .../apache/rocketmq/container/BrokerBootHook.java  |  48 ++
 .../apache/rocketmq/container/BrokerContainer.java | 435 +++++++++++++++++++
 .../rocketmq/container/BrokerContainerConfig.java  |  84 ++++
 .../container/BrokerContainerProcessor.java        | 273 ++++++++++++
 .../rocketmq/container/BrokerContainerStartup.java | 482 +++++++++++++++++++++
 .../rocketmq/container/BrokerPreOnlineService.java | 277 ++++++++++++
 .../ContainerClientHouseKeepingService.java        | 104 +++++
 .../rocketmq/container/IBrokerContainer.java       | 142 ++++++
 .../rocketmq/container/InnerBrokerController.java  | 378 ++++++++++++++++
 .../container/InnerSalveBrokerController.java      | 160 +++++++
 .../logback/BrokerLogbackConfigurator.java         | 187 ++++++++
 .../container/BrokerContainerStartupTest.java      | 151 +++++++
 .../rocketmq/container/BrokerContainerTest.java    | 339 +++++++++++++++
 .../rocketmq/container/BrokerPreOnlineTest.java    | 102 +++++
 pom.xml                                            |  20 +-
 .../java/org/apache/rocketmq/remoting/RPCHook.java |   4 +-
 .../apache/rocketmq/remoting/RemotingClient.java   |  13 +-
 .../apache/rocketmq/remoting/RemotingServer.java   |   6 +
 .../apache/rocketmq/remoting/RemotingService.java  |   5 +
 .../rocketmq/remoting/common/RemotingHelper.java   |  38 ++
 .../rocketmq/remoting/common/RemotingUtil.java     |  18 +-
 .../rocketmq/remoting/netty/NettyClientConfig.java |  38 ++
 .../remoting/netty/NettyRemotingAbstract.java      | 115 +++--
 .../remoting/netty/NettyRemotingClient.java        | 396 +++++++++++++----
 .../remoting/netty/NettyRemotingServer.java        | 158 ++++++-
 .../remoting/netty/NettyRequestProcessor.java      |   1 -
 .../rocketmq/remoting/netty/ResponseFuture.java    |  42 +-
 .../remoting/protocol/RemotingCommand.java         |   8 +-
 .../rocketmq/remoting/RemotingServerTest.java      |   5 +-
 .../rocketmq/remoting/SubRemotingServerTest.java   | 109 +++++
 .../remoting/netty/NettyRemotingAbstractTest.java  |   6 +-
 .../remoting/protocol/RemotingCommandTest.java     |   1 +
 33 files changed, 3995 insertions(+), 186 deletions(-)

diff --git a/container/pom.xml b/container/pom.xml
new file mode 100644
index 0000000..a7f13d1
--- /dev/null
+++ b/container/pom.xml
@@ -0,0 +1,36 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>5.0.0-BETA-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-container</artifactId>
+    <name>rocketmq-container ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java b/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java
new file mode 100644
index 0000000..fe126af
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerBootHook.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.util.Properties;
+import org.apache.rocketmq.broker.BrokerController;
+
+public interface BrokerBootHook {
+    /**
+     * Name of the hook.
+     *
+     * @return name of the hook
+     */
+    String hookName();
+
+    /**
+     * Code to execute before broker start.
+     *
+     * @param brokerController broker to start
+     * @param properties broker properties
+     * @throws Exception when execute hook
+     */
+    void executeBeforeStart(BrokerController brokerController, Properties properties) throws Exception;
+
+    /**
+     * Code to execute after broker start.
+     *
+     * @param brokerController broker to start
+     * @param properties broker properties
+     * @throws Exception when execute hook
+     */
+    void executeAfterStart(BrokerController brokerController, Properties properties) throws Exception;
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
new file mode 100644
index 0000000..50b8bc4
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.container.logback.BrokerLogbackConfigurator;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class BrokerContainer implements IBrokerContainer {
+    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+        new BasicThreadFactory.Builder()
+            .namingPattern("BrokerContainerScheduledThread")
+            .daemon(true)
+            .build());
+    private final NettyServerConfig nettyServerConfig;
+    private final NettyClientConfig nettyClientConfig;
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final ContainerClientHouseKeepingService containerClientHouseKeepingService;
+
+    private final ConcurrentMap<BrokerIdentity, InnerSalveBrokerController> slaveBrokerControllers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<BrokerIdentity, InnerBrokerController> masterBrokerControllers = new ConcurrentHashMap<>();
+    private final List<BrokerBootHook> brokerBootHookList = new ArrayList<>();
+    private final BrokerContainerProcessor brokerContainerProcessor;
+    private final Configuration configuration;
+    private final BrokerContainerConfig brokerContainerConfig;
+
+    private RemotingServer remotingServer;
+    private RemotingServer fastRemotingServer;
+    private ExecutorService brokerContainerExecutor;
+
+    public BrokerContainer(
+        final BrokerContainerConfig brokerContainerConfig,
+        final NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig
+    ) {
+        this.brokerContainerConfig = brokerContainerConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.nettyClientConfig = nettyClientConfig;
+
+        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+
+        this.brokerContainerProcessor = new BrokerContainerProcessor(this);
+        this.brokerContainerProcessor.registerBrokerBootHook(this.brokerBootHookList);
+        this.containerClientHouseKeepingService = new ContainerClientHouseKeepingService(this);
+
+        this.configuration = new Configuration(
+            LOG,
+            BrokerPathConfigHelper.getBrokerConfigPath(),
+            this.brokerContainerConfig, this.nettyServerConfig, this.nettyClientConfig);
+    }
+
+    @Override
+    public String getBrokerContainerAddr() {
+        return this.brokerContainerConfig.getBrokerContainerIP() + ":" + this.nettyServerConfig.getListenPort();
+    }
+
+    @Override
+    public BrokerContainerConfig getBrokerContainerConfig() {
+        return brokerContainerConfig;
+    }
+
+    @Override
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+    public NettyClientConfig getNettyClientConfig() {
+        return nettyClientConfig;
+    }
+
+    @Override
+    public BrokerOuterAPI getBrokerOuterAPI() {
+        return brokerOuterAPI;
+    }
+
+    @Override
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+
+    public boolean initialize() {
+        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService);
+        this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2);
+
+        this.brokerContainerExecutor = new ThreadPoolExecutor(
+            1,
+            1,
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(10000),
+            new ThreadFactoryImpl("SharedBrokerThread_"));
+
+        this.registerProcessor();
+
+        if (this.brokerContainerConfig.getNamesrvAddr() != null) {
+            this.brokerOuterAPI.updateNameServerAddressList(this.brokerContainerConfig.getNamesrvAddr());
+            LOG.info("Set user specified name server address: {}", this.brokerContainerConfig.getNamesrvAddr());
+            // also auto update namesrv if specify
+            this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+                @Override
+                public void run2() {
+                    try {
+                        BrokerContainer.this.brokerOuterAPI.updateNameServerAddressList(BrokerContainer.this.brokerContainerConfig.getNamesrvAddr());
+                    } catch (Throwable e) {
+                        LOG.error("ScheduledTask fetchNameServerAddr exception", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+        } else if (this.brokerContainerConfig.isFetchNamesrvAddrByAddressServer()) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+
+                @Override
+                public void run2() {
+                    try {
+                        BrokerContainer.this.brokerOuterAPI.fetchNameServerAddr();
+                    } catch (Throwable e) {
+                        LOG.error("ScheduledTask fetchNameServerAddr exception", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+        }
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(BrokerIdentity.BROKER_CONTAINER_IDENTITY) {
+            @Override
+            public void run2() {
+                try {
+                    BrokerContainer.this.brokerOuterAPI.refreshMetadata();
+                } catch (Exception e) {
+                    LOG.error("ScheduledTask refresh metadata exception", e);
+                }
+            }
+        }, 1, 5, TimeUnit.SECONDS);
+
+        return true;
+    }
+
+    private void registerProcessor() {
+        remotingServer.registerDefaultProcessor(brokerContainerProcessor, this.brokerContainerExecutor);
+        fastRemotingServer.registerDefaultProcessor(brokerContainerProcessor, this.brokerContainerExecutor);
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (this.remotingServer != null) {
+            this.remotingServer.start();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.start();
+        }
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.start();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        // Shutdown slave brokers
+        for (InnerSalveBrokerController slaveBrokerController : slaveBrokerControllers.values()) {
+            slaveBrokerController.shutdown();
+        }
+
+        slaveBrokerControllers.clear();
+
+        // Shutdown master brokers
+        for (BrokerController masterBrokerController : masterBrokerControllers.values()) {
+            masterBrokerController.shutdown();
+        }
+
+        masterBrokerControllers.clear();
+
+        // Shutdown the remoting server with a high priority to avoid further traffic
+        if (this.remotingServer != null) {
+            this.remotingServer.shutdown();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.shutdown();
+        }
+
+        // Shutdown the request executors
+        ThreadUtils.shutdown(this.brokerContainerExecutor);
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.shutdown();
+        }
+    }
+
+    public void registerClientRPCHook(RPCHook rpcHook) {
+        this.getBrokerOuterAPI().registerRPCHook(rpcHook);
+    }
+
+    public void clearClientRPCHook() {
+        this.getBrokerOuterAPI().clearRPCHook();
+    }
+
+    public List<BrokerBootHook> getBrokerBootHookList() {
+        return brokerBootHookList;
+    }
+
+    public void registerBrokerBootHook(BrokerBootHook brokerBootHook) {
+        this.brokerBootHookList.add(brokerBootHook);
+        LOG.info("register BrokerBootHook, {}", brokerBootHook.hookName());
+    }
+
+    @Override
+    public InnerBrokerController addBroker(final BrokerConfig brokerConfig,
+        final MessageStoreConfig storeConfig) throws Exception {
+        if (brokerConfig.getBrokerId() == MixAll.MASTER_ID && storeConfig.getBrokerRole() != BrokerRole.SLAVE) {
+            return this.addMasterBroker(brokerConfig, storeConfig);
+        }
+        if (brokerConfig.getBrokerId() != MixAll.MASTER_ID && storeConfig.getBrokerRole() == BrokerRole.SLAVE) {
+            return this.addSlaveBroker(brokerConfig, storeConfig);
+        }
+
+        return null;
+    }
+
+    public InnerBrokerController addMasterBroker(final BrokerConfig masterBrokerConfig,
+        final MessageStoreConfig storeConfig) throws Exception {
+
+        masterBrokerConfig.setInBrokerContainer(true);
+        if (storeConfig.isDuplicationEnable()) {
+            LOG.error("Can not add broker to container when duplicationEnable is true currently");
+            throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+        }
+        InnerBrokerController masterBroker = new InnerBrokerController(this, masterBrokerConfig, storeConfig);
+        BrokerIdentity brokerIdentity = new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(),
+            masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId());
+        final BrokerController previousBroker = masterBrokerControllers.putIfAbsent(brokerIdentity, masterBroker);
+        if (previousBroker == null) {
+            // New master broker added, start it
+            try {
+                BrokerLogbackConfigurator.doConfigure(masterBrokerConfig);
+                final boolean initResult = masterBroker.initialize();
+                if (!initResult) {
+                    masterBroker.shutdown();
+                    masterBrokerControllers.remove(brokerIdentity);
+                    throw new Exception("Failed to init master broker " + masterBrokerConfig.getCanonicalName());
+                }
+
+                for (InnerSalveBrokerController slaveBroker : this.getSlaveBrokers()) {
+                    if (slaveBroker.getMessageStore().getMasterStoreInProcess() == null) {
+                        slaveBroker.getMessageStore().setMasterStoreInProcess(masterBroker.getMessageStore());
+                    }
+                }
+            } catch (Exception e) {
+                // Remove the failed master broker and throw the exception
+                masterBroker.shutdown();
+                masterBrokerControllers.remove(brokerIdentity);
+                throw new Exception("Failed to initialize master broker " + masterBrokerConfig.getCanonicalName(), e);
+            }
+            return masterBroker;
+        }
+        throw new Exception(masterBrokerConfig.getCanonicalName() + " has already been added to current broker");
+    }
+
+    /**
+     * This function will create a slave broker along with the main broker, and start it with a different port.
+     *
+     * @param slaveBrokerConfig the specific slave broker config
+     * @throws Exception is thrown if an error occurs
+     */
+    public InnerSalveBrokerController addSlaveBroker(final BrokerConfig slaveBrokerConfig,
+        final MessageStoreConfig storeConfig) throws Exception {
+
+        slaveBrokerConfig.setInBrokerContainer(true);
+        if (storeConfig.isDuplicationEnable()) {
+            LOG.error("Can not add broker to container when duplicationEnable is true currently");
+            throw new Exception("Can not add broker to container when duplicationEnable is true currently");
+        }
+
+        int ratio = storeConfig.getAccessMessageInMemoryMaxRatio() - 10;
+        storeConfig.setAccessMessageInMemoryMaxRatio(Math.max(ratio, 0));
+        InnerSalveBrokerController slaveBroker = new InnerSalveBrokerController(this, slaveBrokerConfig, storeConfig);
+        BrokerIdentity brokerIdentity = new BrokerIdentity(slaveBrokerConfig.getBrokerClusterName(),
+            slaveBrokerConfig.getBrokerName(), slaveBrokerConfig.getBrokerId());
+        final InnerSalveBrokerController previousBroker = slaveBrokerControllers.putIfAbsent(brokerIdentity, slaveBroker);
+        if (previousBroker == null) {
+            // New slave broker added, start it
+            try {
+                BrokerLogbackConfigurator.doConfigure(slaveBrokerConfig);
+                final boolean initResult = slaveBroker.initialize();
+                if (!initResult) {
+                    slaveBroker.shutdown();
+                    slaveBrokerControllers.remove(brokerIdentity);
+                    throw new Exception("Failed to init slave broker " + slaveBrokerConfig.getCanonicalName());
+                }
+                BrokerController masterBroker = this.peekMasterBroker();
+                if (slaveBroker.getMessageStore().getMasterStoreInProcess() == null && masterBroker != null) {
+                    slaveBroker.getMessageStore().setMasterStoreInProcess(masterBroker.getMessageStore());
+                }
+            } catch (Exception e) {
+                // Remove the failed slave broker and throw the exception
+                slaveBroker.shutdown();
+                slaveBrokerControllers.remove(brokerIdentity);
+                throw new Exception("Failed to initialize slave broker " + slaveBrokerConfig.getCanonicalName(), e);
+            }
+            return slaveBroker;
+        }
+        throw new Exception(slaveBrokerConfig.getCanonicalName() + " has already been added to current broker");
+    }
+
+    @Override
+    public BrokerController removeBroker(final BrokerIdentity brokerIdentity) throws Exception {
+
+        InnerSalveBrokerController slaveBroker = slaveBrokerControllers.remove(brokerIdentity);
+        if (slaveBroker != null) {
+            slaveBroker.shutdown();
+            return slaveBroker;
+        }
+
+        BrokerController masterBroker = masterBrokerControllers.remove(brokerIdentity);
+
+        BrokerController nextMasterBroker = this.peekMasterBroker();
+        for (InnerSalveBrokerController slave : this.getSlaveBrokers()) {
+            if (nextMasterBroker == null) {
+                slave.getMessageStore().setMasterStoreInProcess(null);
+            } else {
+                slave.getMessageStore().setMasterStoreInProcess(nextMasterBroker.getMessageStore());
+            }
+
+        }
+
+        if (masterBroker != null) {
+            masterBroker.shutdown();
+            return masterBroker;
+        }
+
+        return null;
+    }
+
+    @Override
+    public BrokerController getBroker(final BrokerIdentity brokerIdentity) {
+        InnerSalveBrokerController slaveBroker = slaveBrokerControllers.get(brokerIdentity);
+        if (slaveBroker != null) {
+            return slaveBroker;
+        }
+
+        return masterBrokerControllers.get(brokerIdentity);
+    }
+
+    @Override
+    public Collection<InnerBrokerController> getMasterBrokers() {
+        return masterBrokerControllers.values();
+    }
+
+    @Override
+    public Collection<InnerSalveBrokerController> getSlaveBrokers() {
+        return slaveBrokerControllers.values();
+    }
+
+    @Override
+    public List<BrokerController> getBrokerControllers() {
+        List<BrokerController> brokerControllers = new ArrayList<>();
+        brokerControllers.addAll(this.getMasterBrokers());
+        brokerControllers.addAll(this.getSlaveBrokers());
+        return brokerControllers;
+    }
+
+    @Override
+    public BrokerController peekMasterBroker() {
+        if (!masterBrokerControllers.isEmpty()) {
+            return masterBrokerControllers.values().iterator().next();
+        }
+        return null;
+    }
+
+    public BrokerController findBrokerControllerByBrokerName(String brokerName) {
+        for (BrokerController brokerController : masterBrokerControllers.values()) {
+            if (brokerController.getBrokerConfig().getBrokerName().equals(brokerName)) {
+                return brokerController;
+            }
+        }
+
+        for (BrokerController brokerController : slaveBrokerControllers.values()) {
+            if (brokerController.getBrokerConfig().getBrokerName().equals(brokerName)) {
+                return brokerController;
+            }
+        }
+        return null;
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
new file mode 100644
index 0000000..28a5242
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class BrokerContainerConfig {
+
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+    @ImportantField
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    @ImportantField
+    private boolean fetchNamesrvAddrByAddressServer = false;
+
+    @ImportantField
+    private String brokerContainerIP = RemotingUtil.getLocalAddress();
+
+    private String brokerConfigPaths = null;
+
+    private boolean compatibleWithOldNameSrv = true;
+
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public boolean isFetchNamesrvAddrByAddressServer() {
+        return fetchNamesrvAddrByAddressServer;
+    }
+
+    public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+        this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+    }
+
+    public String getBrokerContainerIP() {
+        return brokerContainerIP;
+    }
+
+    public String getBrokerConfigPaths() {
+        return brokerConfigPaths;
+    }
+
+    public void setBrokerConfigPaths(String brokerConfigPaths) {
+        this.brokerConfigPaths = brokerConfigPaths;
+    }
+
+    public boolean isCompatibleWithOldNameSrv() {
+        return compatibleWithOldNameSrv;
+    }
+
+    public void setCompatibleWithOldNameSrv(boolean compatibleWithOldNameSrv) {
+        this.compatibleWithOldNameSrv = compatibleWithOldNameSrv;
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
new file mode 100644
index 0000000..6893882
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerProcessor.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.AddBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.RemoveBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class BrokerContainerProcessor implements NettyRequestProcessor {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerContainer brokerContainer;
+    private List<BrokerBootHook> brokerBootHookList;
+
+    public BrokerContainerProcessor(BrokerContainer brokerContainer) {
+        this.brokerContainer = brokerContainer;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        switch (request.getCode()) {
+            case RequestCode.ADD_BROKER:
+                return this.addBroker(ctx, request);
+            case RequestCode.REMOVE_BROKER:
+                return this.removeBroker(ctx, request);
+            case RequestCode.GET_BROKER_CONFIG:
+                return this.getBrokerConfig(ctx, request);
+            case RequestCode.UPDATE_BROKER_CONFIG:
+                return this.updateBrokerConfig(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override public boolean rejectRequest() {
+        return false;
+    }
+
+    private synchronized RemotingCommand addBroker(ChannelHandlerContext ctx,
+        RemotingCommand request) throws Exception {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final AddBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(AddBrokerRequestHeader.class);
+
+        LOGGER.info("addBroker called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        Properties brokerProperties = null;
+        String configPath = requestHeader.getConfigPath();
+
+        if (configPath != null && !configPath.isEmpty()) {
+            BrokerStartup.SystemConfigFileHelper configFileHelper = new BrokerStartup.SystemConfigFileHelper();
+            configFileHelper.setFile(configPath);
+
+            try {
+                brokerProperties = configFileHelper.loadConfig();
+            } catch (Exception e) {
+                LOGGER.error("addBroker load config from {} failed, {}", configPath, e);
+            }
+        } else {
+            byte[] body = request.getBody();
+            if (body != null) {
+                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+                brokerProperties = MixAll.string2Properties(bodyStr);
+            }
+        }
+
+        if (brokerProperties == null) {
+            LOGGER.error("addBroker properties empty");
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("addBroker properties empty");
+            return response;
+        }
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        MixAll.properties2Object(brokerProperties, brokerConfig);
+        MixAll.properties2Object(brokerProperties, messageStoreConfig);
+
+        messageStoreConfig.setHaListenPort(brokerConfig.getListenPort() + 1);
+
+        if (configPath != null && !configPath.isEmpty()) {
+            brokerConfig.setBrokerConfigPath(configPath);
+        }
+
+        switch (messageStoreConfig.getBrokerRole()) {
+            case ASYNC_MASTER:
+            case SYNC_MASTER:
+                brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                break;
+            case SLAVE:
+                if (brokerConfig.getBrokerId() <= 0) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("slave broker id must be > 0");
+                    return response;
+                }
+                break;
+            default:
+                break;
+
+        }
+
+        if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+            || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+            || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("invalid replicas number");
+            return response;
+        }
+
+        BrokerController brokerController;
+        try {
+            brokerController = this.brokerContainer.addBroker(brokerConfig, messageStoreConfig);
+        } catch (Exception e) {
+            LOGGER.error("addBroker exception {}", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+        if (brokerController != null) {
+            brokerController.getConfiguration().registerConfig(brokerProperties);
+            try {
+                for (BrokerBootHook brokerBootHook : brokerBootHookList) {
+                    brokerBootHook.executeBeforeStart(brokerController, brokerProperties);
+                }
+                brokerController.start();
+
+                for (BrokerBootHook brokerBootHook : brokerBootHookList) {
+                    brokerBootHook.executeAfterStart(brokerController, brokerProperties);
+                }
+            } catch (Exception e) {
+                LOGGER.error("start broker exception {}", e);
+                BrokerIdentity brokerIdentity = new BrokerIdentity(brokerConfig.getBrokerClusterName(),
+                    brokerConfig.getBrokerName(),
+                    brokerConfig.getBrokerId());
+                this.brokerContainer.removeBroker(brokerIdentity);
+                brokerController.shutdown();
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("start broker failed, " + e);
+                return response;
+            }
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("add broker return null");
+        }
+
+        return response;
+    }
+
+    private synchronized RemotingCommand removeBroker(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final RemoveBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(RemoveBrokerRequestHeader.class);
+
+        LOGGER.info("removeBroker called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        BrokerIdentity brokerIdentity = new BrokerIdentity(requestHeader.getBrokerClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerId());
+
+        BrokerController brokerController;
+        try {
+            brokerController = this.brokerContainer.removeBroker(brokerIdentity);
+        } catch (Exception e) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+
+        if (brokerController != null) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } else {
+            response.setCode(ResponseCode.BROKER_NOT_EXIST);
+            response.setRemark("Broker not exist");
+        }
+        return response;
+    }
+
+    public void registerBrokerBootHook(List<BrokerBootHook> brokerBootHookList) {
+        this.brokerBootHookList = brokerBootHookList;
+    }
+
+    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        LOGGER.info("updateSharedBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            try {
+                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+                Properties properties = MixAll.string2Properties(bodyStr);
+                if (properties != null) {
+                    LOGGER.info("updateSharedBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
+                    this.brokerContainer.getConfiguration().update(properties);
+                } else {
+                    LOGGER.error("string2Properties error");
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("string2Properties error");
+                    return response;
+                }
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
+        final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader();
+
+        String content = this.brokerContainer.getConfiguration().getAllConfigsFormatString();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+
+        responseHeader.setVersion(this.brokerContainer.getConfiguration().getDataVersionJson());
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
new file mode 100644
index 0000000..f4f5d4a
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerContainerStartup {
+    private static final char BROKER_CONTAINER_CONFIG_OPTION = 'c';
+    private static final char BROKER_CONFIG_OPTION = 'b';
+    private static final char PRINT_PROPERTIES_OPTION = 'p';
+    private static final String PRINT_IMPORTANT_PROPERTIES_OPTION = "pm";
+    public static Properties properties = null;
+    public static CommandLine commandLine = null;
+    public static String configFile = null;
+    public static InternalLogger log;
+    public static SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+    public static String rocketmqHome = null;
+    public static JoranConfigurator configurator = new JoranConfigurator();
+
+    public static void main(String[] args) {
+        final BrokerContainer brokerContainer = startBrokerContainer(createBrokerContainer(args));
+        createAndStartBrokers(brokerContainer);
+    }
+
+    /**
+     * Start brokerController, for old version compatibility.
+     *
+     * @param controller brokerController
+     * @return brokerController
+     */
+    public static BrokerController start(BrokerController controller) {
+        try {
+
+            controller.start();
+
+            String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+            if (null != controller.getBrokerConfig().getNamesrvAddr()) {
+                tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
+            }
+
+            log.info(tip);
+            System.out.printf("%s%n", tip);
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    public static BrokerController createBrokerController(String[] args) {
+        final BrokerContainer sharedBrokerController = startBrokerContainer(createBrokerContainer(args, true));
+        return createAndInitializeBroker(sharedBrokerController, configFile, properties);
+    }
+
+    public static List<BrokerController> createAndStartBrokers(BrokerContainer brokerContainer) {
+        String[] configPaths = parseBrokerConfigPath();
+        List<BrokerController> brokerControllerList = new ArrayList<>();
+
+        if (configPaths != null && configPaths.length > 0) {
+            SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+            for (String configPath : configPaths) {
+                System.out.printf("Start broker from config file path %s%n", configPath);
+                configFileHelper.setFile(configPath);
+
+                Properties brokerProperties = null;
+                try {
+                    brokerProperties = configFileHelper.loadConfig();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    System.exit(-1);
+                }
+
+                final BrokerController brokerController = createAndInitializeBroker(brokerContainer, configPath, brokerProperties);
+                if (brokerController != null) {
+                    brokerControllerList.add(brokerController);
+                    startBrokerController(brokerContainer, brokerController, brokerProperties);
+                }
+            }
+        }
+
+        return brokerControllerList;
+    }
+
+    public static String[] parseBrokerConfigPath() {
+        String brokerConfigList = null;
+        if (commandLine.hasOption(BROKER_CONFIG_OPTION)) {
+            brokerConfigList = commandLine.getOptionValue(BROKER_CONFIG_OPTION);
+
+        } else if (commandLine.hasOption(BROKER_CONTAINER_CONFIG_OPTION)) {
+            String brokerContainerConfigPath = commandLine.getOptionValue(BROKER_CONTAINER_CONFIG_OPTION);
+            if (brokerContainerConfigPath != null) {
+                BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+                SystemConfigFileHelper configFileHelper = new SystemConfigFileHelper();
+                configFileHelper.setFile(brokerContainerConfigPath);
+                Properties brokerContainerProperties = null;
+                try {
+                    brokerContainerProperties = configFileHelper.loadConfig();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    System.exit(-1);
+                }
+                if (brokerContainerProperties != null) {
+                    MixAll.properties2Object(brokerContainerProperties, brokerContainerConfig);
+                }
+                brokerConfigList = brokerContainerConfig.getBrokerConfigPaths();
+            }
+        }
+
+        if (brokerConfigList != null) {
+            return brokerConfigList.split(":");
+        }
+        return null;
+    }
+
+    public static BrokerController createAndInitializeBroker(BrokerContainer brokerContainer,
+        String filePath, Properties brokerProperties) {
+
+        final BrokerConfig brokerConfig = new BrokerConfig();
+        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        if (brokerProperties != null) {
+            properties2SystemEnv(brokerProperties);
+            MixAll.properties2Object(brokerProperties, brokerConfig);
+            MixAll.properties2Object(brokerProperties, messageStoreConfig);
+        }
+
+        messageStoreConfig.setHaListenPort(brokerConfig.getListenPort() + 1);
+
+        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+        switch (messageStoreConfig.getBrokerRole()) {
+            case ASYNC_MASTER:
+            case SYNC_MASTER:
+                brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                break;
+            case SLAVE:
+                if (brokerConfig.getBrokerId() <= 0) {
+                    System.out.printf("Slave's brokerId must be > 0%n");
+                    System.exit(-3);
+                }
+
+                break;
+            default:
+                break;
+        }
+
+        if (messageStoreConfig.getTotalReplicas() < messageStoreConfig.getInSyncReplicas()
+            || messageStoreConfig.getTotalReplicas() < messageStoreConfig.getMinInSyncReplicas()
+            || messageStoreConfig.getInSyncReplicas() < messageStoreConfig.getMinInSyncReplicas()) {
+            System.out.printf("invalid replicas number%n");
+            System.exit(-3);
+        }
+
+        brokerConfig.setBrokerConfigPath(filePath);
+
+        log = InternalLoggerFactory.getLogger(brokerConfig.getLoggerIdentifier() + LoggerName.BROKER_LOGGER_NAME);
+        MixAll.printObjectProperties(log, brokerConfig);
+        MixAll.printObjectProperties(log, messageStoreConfig);
+
+        try {
+            BrokerController brokerController = brokerContainer.addBroker(brokerConfig, messageStoreConfig);
+            if (brokerController != null) {
+                brokerController.getConfiguration().registerConfig(brokerProperties);
+                return brokerController;
+            } else {
+                System.out.printf("Add broker [%s-%s] failed.%n", brokerConfig.getBrokerName(), brokerConfig.getBrokerId());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+        return null;
+    }
+
+    public static BrokerContainer startBrokerContainer(BrokerContainer brokerContainer) {
+        try {
+
+            brokerContainer.start();
+
+            String tip = "The broker container boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+            if (null != brokerContainer.getBrokerContainerConfig().getNamesrvAddr()) {
+                tip += " and name server is " + brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
+            }
+
+            log.info(tip);
+            System.out.printf("%s%n", tip);
+            return brokerContainer;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    public static void startBrokerController(BrokerContainer brokerContainer,
+        BrokerController brokerController, Properties brokerProperties) {
+        try {
+            for (BrokerBootHook hook : brokerContainer.getBrokerBootHookList()) {
+                hook.executeBeforeStart(brokerController, brokerProperties);
+            }
+
+            brokerController.start();
+
+            for (BrokerBootHook hook : brokerContainer.getBrokerBootHookList()) {
+                hook.executeAfterStart(brokerController, brokerProperties);
+            }
+
+            String tip = String.format("Broker [%s-%s] boot success. serializeType=%s",
+                brokerController.getBrokerConfig().getBrokerName(),
+                brokerController.getBrokerConfig().getBrokerId(),
+                RemotingCommand.getSerializeTypeConfigInThisServer());
+
+            log.info(tip);
+            System.out.printf("%s%n", tip);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+    }
+
+    public static void shutdown(final BrokerContainer controller) {
+        if (null != controller) {
+            controller.shutdown();
+        }
+    }
+
+    public static BrokerContainer createBrokerContainer(String[] args) {
+        return createBrokerContainer(args, false);
+    }
+
+    public static BrokerContainer createBrokerContainer(String[] args, boolean useDefaultPort) {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+            NettySystemConfig.socketSndbufSize = 131072;
+        }
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+            NettySystemConfig.socketRcvbufSize = 131072;
+        }
+
+        try {
+            //PackageConflictDetect.detectFastjson();
+            Options options = ServerUtil.buildCommandlineOptions(new Options());
+            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+                new DefaultParser());
+            if (null == commandLine) {
+                System.exit(-1);
+            }
+
+            final BrokerContainerConfig brokerConfig = new BrokerContainerConfig();
+            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+
+            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TlsSystemConfig.TLS_ENABLE,
+                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
+            nettyServerConfig.setListenPort(10811);
+
+            if (commandLine.hasOption(BROKER_CONTAINER_CONFIG_OPTION)) {
+                String file = commandLine.getOptionValue(BROKER_CONTAINER_CONFIG_OPTION);
+                if (file != null) {
+                    configFileHelper.setFile(file);
+                    configFile = file;
+                    BrokerPathConfigHelper.setBrokerConfigPath(file);
+                }
+            }
+
+            properties = configFileHelper.loadConfig();
+            if (properties != null) {
+                properties2SystemEnv(properties);
+                MixAll.properties2Object(properties, brokerConfig);
+                MixAll.properties2Object(properties, nettyServerConfig);
+                MixAll.properties2Object(properties, nettyClientConfig);
+            }
+
+            if (useDefaultPort) {
+                nettyServerConfig.setListenPort(10811);
+            }
+
+            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+            if (null == brokerConfig.getRocketmqHome()) {
+                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+                System.exit(-2);
+            }
+            rocketmqHome = brokerConfig.getRocketmqHome();
+
+            String namesrvAddr = brokerConfig.getNamesrvAddr();
+            if (null != namesrvAddr) {
+                try {
+                    String[] addrArray = namesrvAddr.split(";");
+                    for (String addr : addrArray) {
+                        RemotingUtil.string2SocketAddress(addr);
+                    }
+                } catch (Exception e) {
+                    System.out.printf(
+                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+                        namesrvAddr);
+                    System.exit(-3);
+                }
+            }
+
+            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            configurator.setContext(lc);
+            lc.reset();
+            //https://logback.qos.ch/manual/configuration.html
+            lc.setPackagingDataEnabled(false);
+
+            configurator.doConfigure(rocketmqHome + "/conf/logback_broker.xml");
+
+            if (commandLine.hasOption(PRINT_PROPERTIES_OPTION)) {
+                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+                MixAll.printObjectProperties(console, brokerConfig);
+                MixAll.printObjectProperties(console, nettyServerConfig);
+                MixAll.printObjectProperties(console, nettyClientConfig);
+                System.exit(0);
+            } else if (commandLine.hasOption(PRINT_IMPORTANT_PROPERTIES_OPTION)) {
+                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
+                MixAll.printObjectProperties(console, brokerConfig, true);
+                MixAll.printObjectProperties(console, nettyServerConfig, true);
+                MixAll.printObjectProperties(console, nettyClientConfig, true);
+                System.exit(0);
+            }
+
+            log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+            MixAll.printObjectProperties(log, brokerConfig);
+            MixAll.printObjectProperties(log, nettyServerConfig);
+            MixAll.printObjectProperties(log, nettyClientConfig);
+
+            final BrokerContainer brokerContainer = new BrokerContainer(
+                brokerConfig,
+                nettyServerConfig,
+                nettyClientConfig);
+            // remember all configs to prevent discard
+            brokerContainer.getConfiguration().registerConfig(properties);
+
+            boolean initResult = brokerContainer.initialize();
+            if (!initResult) {
+                brokerContainer.shutdown();
+                System.exit(-3);
+            }
+
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+                private volatile boolean hasShutdown = false;
+                private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+                @Override
+                public void run() {
+                    synchronized (this) {
+                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
+                        if (!this.hasShutdown) {
+                            this.hasShutdown = true;
+                            long beginTime = System.currentTimeMillis();
+                            brokerContainer.shutdown();
+                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
+                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
+                        }
+                    }
+                }
+            }, "ShutdownHook"));
+
+            return brokerContainer;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    private static void properties2SystemEnv(Properties properties) {
+        if (properties == null) {
+            return;
+        }
+        String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
+        String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
+        System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
+        System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
+    }
+
+    private static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Config file for shared broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "printConfigItem", false, "Print all config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "printImportantConfig", false, "Print important config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerConfigFiles", true, "The path of broker config files, split by ':'");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    public static class SystemConfigFileHelper {
+        private static final Logger LOGGER = LoggerFactory.getLogger(SystemConfigFileHelper.class);
+
+        private String file;
+
+        public SystemConfigFileHelper() {
+        }
+
+        public Properties loadConfig() throws Exception {
+            InputStream in = new BufferedInputStream(new FileInputStream(file));
+            Properties properties = new Properties();
+            properties.load(in);
+            in.close();
+            return properties;
+        }
+
+        public void update(Properties properties) throws Exception {
+            LOGGER.error("[SystemConfigFileHelper] update no thing.");
+        }
+
+        public void setFile(String file) {
+            this.file = file;
+        }
+
+        public String getFile() {
+            return file;
+        }
+    }
+
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java b/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
new file mode 100644
index 0000000..40a0187
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerPreOnlineService.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
+import org.apache.rocketmq.common.BrokerSyncInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.broker.schedule.DelayOffsetSerializeWrapper;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationRequest;
+
+public class BrokerPreOnlineService extends ServiceThread {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final InnerBrokerController brokerController;
+
+    private int waitBrokerIndex = 0;
+
+    public BrokerPreOnlineService(InnerBrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public String getServiceName() {
+        if (this.brokerController != null && this.brokerController.getBrokerConfig().isInBrokerContainer()) {
+            return brokerController.getBrokerConfig().getLoggerIdentifier() + BrokerPreOnlineService.class.getSimpleName();
+        }
+        return BrokerPreOnlineService.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        while (!this.isStopped()) {
+            if (!this.brokerController.isIsolated()) {
+                LOGGER.info("broker {} is online", this.brokerController.getBrokerConfig().getCanonicalName());
+                break;
+            }
+            try {
+                boolean isSuccess = this.prepareForBrokerOnline();
+                if (!isSuccess) {
+                    this.waitForRunning(1000);
+                } else {
+                    break;
+                }
+            } catch (Exception e) {
+                LOGGER.error("Broker preOnline error, ", e);
+            }
+        }
+
+        LOGGER.info(this.getServiceName() + " service end");
+    }
+
+    CompletableFuture<Boolean> waitForHaHandshakeComplete(String brokerAddr) {
+        LOGGER.info("wait for handshake completion with {}", brokerAddr);
+        HAConnectionStateNotificationRequest request =
+            new HAConnectionStateNotificationRequest(HAConnectionState.TRANSFER, RemotingHelper.parseHostFromAddress(brokerAddr), true);
+        if (this.brokerController.getMessageStore().getHaService() != null) {
+            this.brokerController.getMessageStore().getHaService().putGroupConnectionStateRequest(request);
+        } else {
+            LOGGER.error("HAService is null, maybe broker config is wrong. For example, duplicationEnable is true");
+            request.getRequestFuture().complete(false);
+        }
+        return request.getRequestFuture();
+    }
+
+    private boolean futureWaitAction(boolean result, BrokerMemberGroup brokerMemberGroup) {
+        if (!result) {
+            LOGGER.error("wait for handshake completion failed, HA connection lost");
+            return false;
+        }
+        if (this.brokerController.getBrokerConfig().getBrokerId() != MixAll.MASTER_ID) {
+            LOGGER.info("slave preOnline complete, start service");
+            long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+            this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+        }
+        return true;
+    }
+
+    private boolean prepareForMasterOnline(BrokerMemberGroup brokerMemberGroup) {
+        List<Long> brokerIdList = new ArrayList<>(brokerMemberGroup.getBrokerAddrs().keySet());
+        Collections.sort(brokerIdList);
+        while (true) {
+            if (waitBrokerIndex >= brokerIdList.size()) {
+                LOGGER.info("master preOnline complete, start service");
+                this.brokerController.startService(MixAll.MASTER_ID, this.brokerController.getBrokerAddr());
+                return true;
+            }
+
+            String brokerAddrToWait = brokerMemberGroup.getBrokerAddrs().get(brokerIdList.get(waitBrokerIndex));
+
+            try {
+                this.brokerController.getBrokerOuterAPI().
+                    sendBrokerHaInfo(brokerAddrToWait, this.brokerController.getHAServerAddr(),
+                        this.brokerController.getMessageStore().getBrokerInitMaxOffset(), this.brokerController.getBrokerAddr());
+            } catch (Exception e) {
+                LOGGER.error("send ha address to {} exception, {}", brokerAddrToWait, e);
+                return false;
+            }
+
+            CompletableFuture<Boolean> haHandshakeFuture = waitForHaHandshakeComplete(brokerAddrToWait)
+                .thenApply(result -> futureWaitAction(result, brokerMemberGroup));
+
+            try {
+                if (!haHandshakeFuture.get()) {
+                    return false;
+                }
+            } catch (Exception e) {
+                LOGGER.error("Wait handshake completion exception, {}", e);
+                return false;
+            }
+
+            if (syncMetadataReverse(brokerAddrToWait)) {
+                waitBrokerIndex++;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private boolean syncMetadataReverse(String brokerAddr) {
+        try {
+            LOGGER.info("Get metadata reverse from {}", brokerAddr);
+
+            String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(brokerAddr);
+            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
+                DelayOffsetSerializeWrapper.fromJson(delayOffset, DelayOffsetSerializeWrapper.class);
+
+            ConsumerOffsetSerializeWrapper consumerOffsetSerializeWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(brokerAddr);
+
+            if (null != consumerOffsetSerializeWrapper && brokerController.getConsumerOffsetManager().getDataVersion().compare(consumerOffsetSerializeWrapper.getDataVersion()) <= 0) {
+                LOGGER.info("{}'s consumerOffset data version is larger than master broker, {}'s consumerOffset will be used.", brokerAddr, brokerAddr);
+                this.brokerController.getConsumerOffsetManager().getOffsetTable()
+                    .putAll(consumerOffsetSerializeWrapper.getOffsetTable());
+                this.brokerController.getConsumerOffsetManager().getDataVersion().assignNewOne(consumerOffsetSerializeWrapper.getDataVersion());
+                this.brokerController.getConsumerOffsetManager().persist();
+            }
+
+            if (null != delayOffset && brokerController.getScheduleMessageService().getDataVersion().compare(delayOffsetSerializeWrapper.getDataVersion()) <= 0) {
+                LOGGER.info("{}'s scheduleMessageService data version is larger than master broker, {}'s delayOffset will be used.", brokerAddr, brokerAddr);
+                String fileName =
+                    StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+                        .getMessageStoreConfig().getStorePathRootDir());
+                try {
+                    MixAll.string2File(delayOffset, fileName);
+                    this.brokerController.getScheduleMessageService().load();
+                } catch (IOException e) {
+                    LOGGER.error("Persist file Exception, {}", fileName, e);
+                }
+            }
+
+            for (BrokerAttachedPlugin brokerAttachedPlugin : brokerController.getBrokerAttachedPlugins()) {
+                if (brokerAttachedPlugin != null) {
+                    brokerAttachedPlugin.syncMetadataReverse(brokerAddr);
+                }
+            }
+
+        } catch (Exception e) {
+            LOGGER.error("GetMetadataReverse Failed", e);
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean prepareForSlaveOnline(BrokerMemberGroup brokerMemberGroup) {
+        BrokerSyncInfo brokerSyncInfo;
+        try {
+            brokerSyncInfo = this.brokerController.getBrokerOuterAPI()
+                .retrieveBrokerHaInfo(brokerMemberGroup.getBrokerAddrs().get(MixAll.MASTER_ID));
+        } catch (Exception e) {
+            LOGGER.error("retrieve master ha info exception, {}", e);
+            return false;
+        }
+
+        if (this.brokerController.getMessageStore().getMasterFlushedOffset() == 0
+            && this.brokerController.getMessageStoreConfig().isSyncMasterFlushOffsetWhenStartup()) {
+            LOGGER.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+            this.brokerController.getMessageStore().setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+        }
+
+        if (brokerSyncInfo.getMasterHaAddress() != null) {
+            this.brokerController.getMessageStore().updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+            this.brokerController.getMessageStore().updateMasterAddress(brokerSyncInfo.getMasterAddress());
+        } else {
+            LOGGER.info("fetch master ha address return null, start service directly");
+            long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+            this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+            return true;
+        }
+
+        CompletableFuture<Boolean> haHandshakeFuture = waitForHaHandshakeComplete(brokerSyncInfo.getMasterHaAddress())
+            .thenApply(result -> futureWaitAction(result, brokerMemberGroup));
+
+        try {
+            if (!haHandshakeFuture.get()) {
+                return false;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Wait handshake completion exception, {}", e);
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean prepareForBrokerOnline() {
+        BrokerMemberGroup brokerMemberGroup;
+        try {
+            brokerMemberGroup = this.brokerController.getBrokerOuterAPI().syncBrokerMemberGroup(
+                this.brokerController.getBrokerConfig().getBrokerClusterName(),
+                this.brokerController.getBrokerConfig().getBrokerName(),
+                this.brokerController.getBrokerContainer().getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+        } catch (Exception e) {
+            LOGGER.error("syncBrokerMemberGroup from namesrv error, start service failed, will try later, ", e);
+            return false;
+        }
+
+        if (brokerMemberGroup != null && !brokerMemberGroup.getBrokerAddrs().isEmpty()) {
+            long minBrokerId = getMinBrokerId(brokerMemberGroup.getBrokerAddrs());
+
+            if (this.brokerController.getBrokerConfig().getBrokerId() == MixAll.MASTER_ID) {
+                return prepareForMasterOnline(brokerMemberGroup);
+            } else if (minBrokerId == MixAll.MASTER_ID) {
+                return prepareForSlaveOnline(brokerMemberGroup);
+            } else {
+                LOGGER.info("no master online, start service directly");
+                this.brokerController.startService(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+            }
+        } else {
+            LOGGER.info("no other broker online, will start service directly");
+            this.brokerController.startService(this.brokerController.getBrokerConfig().getBrokerId(), this.brokerController.getBrokerAddr());
+        }
+
+        return true;
+    }
+
+    private long getMinBrokerId(Map<Long, String> brokerAddrMap) {
+        Map<Long, String> brokerAddrMapCopy = new HashMap<>(brokerAddrMap);
+        brokerAddrMapCopy.remove(this.brokerController.getBrokerConfig().getBrokerId());
+        if (!brokerAddrMapCopy.isEmpty()) {
+            return Collections.min(brokerAddrMapCopy.keySet());
+        }
+        return this.brokerController.getBrokerConfig().getBrokerId();
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
new file mode 100644
index 0000000..dc9f463
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/ContainerClientHouseKeepingService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import io.netty.channel.Channel;
+import java.util.Collection;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+
+public class ContainerClientHouseKeepingService implements ChannelEventListener {
+    private final IBrokerContainer brokerContainer;
+
+    public ContainerClientHouseKeepingService(final IBrokerContainer brokerContainer) {
+        this.brokerContainer = brokerContainer;
+    }
+
+    @Override public void onChannelConnect(String remoteAddr, Channel channel) {
+        onChannelOperation(CallbackCode.CONNECT, remoteAddr, channel);
+    }
+
+    @Override public void onChannelClose(String remoteAddr, Channel channel) {
+        onChannelOperation(CallbackCode.CLOSE, remoteAddr, channel);
+    }
+
+    @Override public void onChannelException(String remoteAddr, Channel channel) {
+        onChannelOperation(CallbackCode.EXCEPTION, remoteAddr, channel);
+    }
+
+    @Override public void onChannelIdle(String remoteAddr, Channel channel) {
+        onChannelOperation(CallbackCode.IDLE, remoteAddr, channel);
+    }
+
+    private void onChannelOperation(CallbackCode callbackCode, String remoteAddr, Channel channel) {
+        Collection<InnerBrokerController> masterBrokers = this.brokerContainer.getMasterBrokers();
+        Collection<InnerSalveBrokerController> slaveBrokers = this.brokerContainer.getSlaveBrokers();
+
+        for (BrokerController masterBroker : masterBrokers) {
+            brokerOperation(masterBroker, callbackCode, remoteAddr, channel);
+        }
+
+        for (InnerSalveBrokerController slaveBroker : slaveBrokers) {
+            brokerOperation(slaveBroker, callbackCode, remoteAddr, channel);
+        }
+    }
+
+    private void brokerOperation(BrokerController brokerController, CallbackCode callbackCode, String remoteAddr,
+        Channel channel) {
+        if (callbackCode == CallbackCode.CONNECT) {
+            brokerController.getBrokerStatsManager().incChannelConnectNum();
+            return;
+        }
+        boolean removed = brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+        removed &= brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+        if (removed) {
+            switch (callbackCode) {
+                case CLOSE:
+                    brokerController.getBrokerStatsManager().incChannelCloseNum();
+                    break;
+                case EXCEPTION:
+                    brokerController.getBrokerStatsManager().incChannelExceptionNum();
+                    break;
+                case IDLE:
+                    brokerController.getBrokerStatsManager().incChannelIdleNum();
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public enum CallbackCode {
+        /**
+         * onChannelConnect
+         */
+        CONNECT,
+        /**
+         * onChannelClose
+         */
+        CLOSE,
+        /**
+         * onChannelException
+         */
+        EXCEPTION,
+        /**
+         * onChannelIdle
+         */
+        IDLE
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java
new file mode 100644
index 0000000..d3cdc05
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/IBrokerContainer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+/**
+ * An interface for broker container to hold multiple master and slave brokers.
+ */
+public interface IBrokerContainer {
+
+    /**
+     * Start broker container
+     */
+    void start() throws Exception;
+
+    /**
+     * Shutdown broker container and all the brokers inside.
+     */
+    void shutdown();
+
+    /**
+     * Add a broker to this container with specific broker config.
+     *
+     * @param brokerConfig the specified broker config
+     * @param storeConfig the specified store config
+     * @return the added BrokerController or null if the broker already exists
+     * @throws Exception when initialize broker
+     */
+    BrokerController addBroker(BrokerConfig brokerConfig, MessageStoreConfig storeConfig) throws Exception;
+
+    /**
+     * Remove the broker from this container associated with the specific broker identity
+     *
+     * @param brokerIdentity the specific broker identity
+     * @return the removed BrokerController or null if the broker doesn't exists
+     */
+    BrokerController removeBroker(BrokerIdentity brokerIdentity) throws Exception;
+
+    /**
+     * Return the broker controller associated with the specific broker identity
+     *
+     * @param brokerIdentity the specific broker identity
+     * @return the associated messaging broker or null
+     */
+    BrokerController getBroker(BrokerIdentity brokerIdentity);
+
+    /**
+     * Return all the master brokers belong to this container
+     *
+     * @return the master broker list
+     */
+    Collection<InnerBrokerController> getMasterBrokers();
+
+    /**
+     * Return all the slave brokers belong to this container
+     *
+     * @return the slave broker list
+     */
+    Collection<InnerSalveBrokerController> getSlaveBrokers();
+
+    /**
+     * Return all broker controller in this container
+     *
+     * @return all broker controller
+     */
+    List<BrokerController> getBrokerControllers();
+
+    /**
+     * Return the address of broker container.
+     *
+     * @return broker container address.
+     */
+    String getBrokerContainerAddr();
+
+    /**
+     * Peek the first master broker in container.
+     *
+     * @return the first master broker in container
+     */
+    BrokerController peekMasterBroker();
+
+    /**
+     * Return the config of the broker container
+     *
+     * @return the broker container config
+     */
+    BrokerContainerConfig getBrokerContainerConfig();
+
+    /**
+     * Get netty server config.
+     *
+     * @return netty server config
+     */
+    NettyServerConfig getNettyServerConfig();
+
+    /**
+     * Get netty client config.
+     *
+     * @return netty client config
+     */
+    NettyClientConfig getNettyClientConfig();
+
+    /**
+     * Return the shared BrokerOuterAPI
+     *
+     * @return the shared BrokerOuterAPI
+     */
+    BrokerOuterAPI getBrokerOuterAPI();
+
+    /**
+     * Return the shared RemotingServer
+     *
+     * @return the shared RemotingServer
+     */
+    RemotingServer getRemotingServer();
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
new file mode 100644
index 0000000..4603164
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerBrokerController.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.container;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class InnerBrokerController extends BrokerController {
+    private ScheduledExecutorService syncBrokerMemberGroupExecutorService;
+    private ScheduledExecutorService brokerHeartbeatExecutorService;
+    protected volatile long minBrokerIdInGroup = 0;
+    protected volatile String minBrokerAddrInGroup = null;
+    protected final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>();
+    private BrokerPreOnlineService brokerPreOnlineService;
+    protected BrokerContainer brokerContainer;
+    protected volatile boolean isIsolated = false;
+
+    public InnerBrokerController(
+        final BrokerContainer brokerContainer,
+        final BrokerConfig brokerConfig,
+        final MessageStoreConfig messageStoreConfig
+    ) {
+        super(brokerConfig, messageStoreConfig);
+        this.brokerContainer = brokerContainer;
+        this.brokerOuterAPI = this.brokerContainer.getBrokerOuterAPI();
+
+        if (!this.brokerConfig.isSkipPreOnline()) {
+            this.brokerPreOnlineService = new BrokerPreOnlineService(this);
+        }
+    }
+
+    @Override
+    protected void initializeRemotingServer() {
+        this.remotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort());
+        this.fastRemotingServer = this.brokerContainer.getRemotingServer().newRemotingServer(brokerConfig.getListenPort() - 2);
+    }
+
+    /**
+     * Initialize resources for master which will be re-used by slave.
+     */
+    @Override
+    protected void initializeResources() {
+        super.initializeResources();
+        this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", brokerConfig));
+        this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", brokerConfig));
+    }
+
+    @Override
+    protected void initializeScheduledTasks() {
+        initializeBrokerScheduledTasks();
+    }
+
+    @Override
+    public void shutdown() {
+
+        shutdownBasicService();
+
+        if (this.remotingServer != null) {
+            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort());
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.brokerContainer.getRemotingServer().removeRemotingServer(brokerConfig.getListenPort() - 2);
+        }
+
+        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            scheduledFuture.cancel(true);
+        }
+
+        if (this.brokerPreOnlineService != null && !this.brokerPreOnlineService.isStopped()) {
+            this.brokerPreOnlineService.shutdown();
+        }
+
+        shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
+        shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
+
+    }
+
+    @Override
+    public String getBrokerAddr() {
+        return this.brokerConfig.getBrokerIP1() + ":" + this.brokerConfig.getListenPort();
+    }
+
+    @Override
+    public void start() throws Exception {
+
+        this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();
+
+        if (messageStoreConfig.getTotalReplicas() > 1) {
+            isIsolated = true;
+        }
+
+        startBasicService();
+
+        if (this.brokerPreOnlineService != null) {
+            this.brokerPreOnlineService.start();
+        }
+
+        if (!isIsolated) {
+            this.registerBrokerAll(true, false, true);
+        }
+
+        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+            @Override
+            public void run2() {
+                try {
+                    if (System.currentTimeMillis() < shouldStartTime) {
+                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
+                        return;
+                    }
+                    if (isIsolated) {
+                        BrokerController.LOG.info("Skip register for broker is isolated");
+                        return;
+                    }
+                    InnerBrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+                } catch (Throwable e) {
+                    BrokerController.LOG.error("registerBrokerAll Exception", e);
+                }
+            }
+        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));
+
+        if (this.brokerConfig.isEnableSlaveActingMaster()) {
+            scheduledFutures.add(this.brokerHeartbeatExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+                @Override
+                public void run2() {
+                    if (isIsolated) {
+                        return;
+                    }
+                    try {
+                        InnerBrokerController.this.sendHeartbeat();
+                    } catch (Exception e) {
+                        BrokerController.LOG.error("sendHeartbeat Exception", e);
+                    }
+
+                }
+            }, 1000, brokerConfig.getBrokerHeartbeatInterval(), TimeUnit.MILLISECONDS));
+
+            scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.brokerConfig) {
+                @Override public void run2() {
+                    try {
+                        InnerBrokerController.this.syncBrokerMemberGroup();
+                    } catch (Throwable e) {
+                        BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
+                    }
+                }
+            }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
+
+        }
+
+        if (!isIsolated && !messageStoreConfig.isEnableDLegerCommitLog()
+            && !messageStoreConfig.isDuplicationEnable() && !this.brokerConfig.isEnableSlaveActingMaster()) {
+            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+        }
+
+        if (brokerConfig.isSkipPreOnline()) {
+            startServiceWithoutCondition();
+        }
+    }
+
+    private void sendHeartbeat() {
+        if (this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv()) {
+            this.brokerOuterAPI.sendHeartbeatViaDataVersion(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                this.getTopicConfigManager().getDataVersion(),
+                this.brokerConfig.isInBrokerContainer());
+        } else {
+            this.brokerOuterAPI.sendHeartbeat(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.brokerConfig.getSendHeartbeatTimeoutMillis(),
+                this.brokerConfig.isInBrokerContainer());
+        }
+    }
+
+    public void syncBrokerMemberGroup() {
+        try {
+            brokerMemberGroup = this.brokerContainer.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.brokerContainer.getBrokerContainerConfig().isCompatibleWithOldNameSrv());
+        } catch (Exception e) {
+            BrokerController.LOG.error("syncBrokerMemberGroup from namesrv failed, ", e);
+            return;
+        }
+        if (brokerMemberGroup == null || brokerMemberGroup.getBrokerAddrs().size() == 0) {
+            BrokerController.LOG.warn("Couldn't find any broker member from namesrv in {}/{}", this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
+            return;
+        }
+        this.messageStore.setAliveReplicaNumInGroup(calcAliveBrokerNumInGroup(brokerMemberGroup.getBrokerAddrs()));
+
+        if (!this.isIsolated) {
+            long minBrokerId = brokerMemberGroup.minimumBrokerId();
+            this.updateMinBroker(minBrokerId, brokerMemberGroup.getBrokerAddrs().get(minBrokerId));
+        }
+    }
+
+    private int calcAliveBrokerNumInGroup(Map<Long, String> brokerAddrTable) {
+        if (brokerAddrTable.containsKey(this.brokerConfig.getBrokerId())) {
+            return brokerAddrTable.size();
+        } else {
+            return brokerAddrTable.size() + 1;
+        }
+    }
+
+    @Override
+    protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+        TopicConfigSerializeWrapper topicConfigWrapper) {
+
+        if (shutdown) {
+            BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
+            return;
+        }
+        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isEnableSlaveActingMaster(),
+            this.brokerConfig.isCompressedRegister(),
+            this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
+            this.brokerConfig.isInBrokerContainer());
+
+        handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
+    }
+
+    @Override
+    public String getNameServerList() {
+        if (this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr() != null) {
+            this.brokerContainer.getBrokerOuterAPI().updateNameServerAddressList(brokerContainer.getBrokerContainerConfig().getNamesrvAddr());
+            return this.brokerContainer.getBrokerContainerConfig().getNamesrvAddr();
+        } else if (this.brokerContainer.getBrokerContainerConfig().isFetchNamesrvAddrByAddressServer()) {
+            return this.brokerContainer.getBrokerOuterAPI().fetchNameServerAddr();
+        }
+        return null;
+    }
+
+    @Override
+    public String getHAServerAddr() {
+        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
+    }
+
+    @Override
+    public long getMinBrokerIdInGroup() {
+        return this.minBrokerIdInGroup;
+    }
+
+    @Override
+    public boolean isSpecialServiceRunning() {
+        if (isScheduleServiceStart() && isTransactionCheckServiceStart()) {
+            return true;
+        }
+
+        return this.ackMessageProcessor != null && this.ackMessageProcessor.isPopReviveServiceRunning();
+    }
+
+    @Override
+    public int getListenPort() {
+        return this.brokerConfig.getListenPort();
+    }
+
+    public BrokerOuterAPI getBrokerOuterAPI() {
+        return brokerContainer.getBrokerOuterAPI();
+    }
+
+    public void startService(long minBrokerId, String minBrokerAddr) {
+        BrokerController.LOG.info("{} start service, min broker id is {}, min broker addr: {}",
+            this.brokerConfig.getCanonicalName(), minBrokerId, minBrokerAddr);
+        this.minBrokerIdInGroup = minBrokerId;
+        this.minBrokerAddrInGroup = minBrokerAddr;
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == minBrokerId);
+        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+        isIsolated = false;
+    }
+
+    public void startServiceWithoutCondition() {
+        BrokerController.LOG.info("{} start service", this.brokerConfig.getCanonicalName());
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
+        this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
+
+        isIsolated = false;
+    }
+
+    public void stopService() {
+        BrokerController.LOG.info("{} stop service", this.getBrokerConfig().getCanonicalName());
+        isIsolated = true;
+        this.changeSpecialServiceStatus(false);
+        this.closeChannels();
+    }
+
+    public synchronized void closeChannels() {
+        this.brokerContainer.getBrokerOuterAPI().getRemotingClient().closeChannels();
+    }
+
+    public BrokerContainer getBrokerContainer() {
+        return this.brokerContainer;
+    }
+
+    public boolean isIsolated() {
+        return this.isIsolated;
+    }
+
+    public NettyServerConfig getNettyServerConfig() {
+        return brokerContainer.getNettyServerConfig();
+    }
+
+    public NettyClientConfig getNettyClientConfig() {
+        return brokerContainer.getNettyClientConfig();
+    }
+
+    public MessageStore getMessageStoreByBrokerName(String brokerName) {
+        if (this.brokerConfig.getBrokerName().equals(brokerName)) {
+            return this.getMessageStore();
+        }
+        BrokerController brokerController = this.brokerContainer.findBrokerControllerByBrokerName(brokerName);
+        if (brokerController != null) {
+            return brokerController.getMessageStore();
+        }
+        return null;
+    }
+
+    @Override
+    public BrokerController peekMasterBroker() {
+        if (brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
+            return this;
+        }
+        return this.brokerContainer.peekMasterBroker();
+    }
+
+    public BrokerPreOnlineService getBrokerPreOnlineService() {
+        return brokerPreOnlineService;
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
new file mode 100644
index 0000000..e00242a
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/InnerSalveBrokerController.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.BrokerSyncInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+public class InnerSalveBrokerController extends InnerBrokerController {
+
+    private final Lock lock = new ReentrantLock();
+
+    public InnerSalveBrokerController(final BrokerContainer brokerContainer,
+        final BrokerConfig brokerConfig,
+        final MessageStoreConfig storeConfig) {
+        super(brokerContainer, brokerConfig, storeConfig);
+        // Check configs
+        checkSlaveBrokerConfig();
+    }
+
+    private void checkSlaveBrokerConfig() {
+        Preconditions.checkNotNull(brokerConfig.getBrokerClusterName());
+        Preconditions.checkNotNull(brokerConfig.getBrokerName());
+        Preconditions.checkArgument(brokerConfig.getBrokerId() != MixAll.MASTER_ID);
+    }
+
+    private void onMasterOffline() {
+        // close channels with master broker
+        String masterAddr = this.slaveSynchronize.getMasterAddr();
+        if (masterAddr != null) {
+            this.brokerOuterAPI.getRemotingClient().closeChannels(
+                Arrays.asList(masterAddr, MixAll.brokerVIPChannel(true, masterAddr)));
+        }
+        // master not available, stop sync
+        this.slaveSynchronize.setMasterAddr(null);
+        this.messageStore.updateHaMasterAddress(null);
+    }
+
+    private void onMasterOnline(String masterAddr, String masterHaAddr) {
+        boolean needSyncMasterFlushOffset = this.messageStore.getMasterFlushedOffset() == 0
+            && this.messageStoreConfig.isSyncMasterFlushOffsetWhenStartup();
+        if (masterHaAddr == null || needSyncMasterFlushOffset) {
+            try {
+                BrokerSyncInfo brokerSyncInfo = this.brokerOuterAPI.retrieveBrokerHaInfo(masterAddr);
+
+                if (needSyncMasterFlushOffset) {
+                    LOG.info("Set master flush offset in slave to {}", brokerSyncInfo.getMasterFlushOffset());
+                    this.messageStore.setMasterFlushedOffset(brokerSyncInfo.getMasterFlushOffset());
+                }
+
+                if (masterHaAddr == null) {
+                    this.messageStore.updateHaMasterAddress(brokerSyncInfo.getMasterHaAddress());
+                    this.messageStore.updateMasterAddress(brokerSyncInfo.getMasterAddress());
+                }
+            } catch (Exception e) {
+                LOG.error("retrieve master ha info exception, {}", e);
+            }
+        }
+
+        // set master HA address.
+        if (masterHaAddr != null) {
+            this.messageStore.updateHaMasterAddress(masterHaAddr);
+        }
+
+        // wakeup HAClient
+        this.messageStore.wakeupHAClient();
+    }
+
+    private void onMinBrokerChange(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+        String masterHaAddr) {
+        LOG.info("Min broker changed, old: {}-{}, new {}-{}",
+            this.minBrokerIdInGroup, this.minBrokerAddrInGroup, minBrokerId, minBrokerAddr);
+
+        this.minBrokerIdInGroup = minBrokerId;
+        this.minBrokerAddrInGroup = minBrokerAddr;
+
+        this.changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == this.minBrokerIdInGroup);
+
+        if (offlineBrokerAddr != null && offlineBrokerAddr.equals(this.slaveSynchronize.getMasterAddr())) {
+            // master offline
+            onMasterOffline();
+        }
+
+        if (minBrokerId == MixAll.MASTER_ID && minBrokerAddr != null) {
+            // master online
+            onMasterOnline(minBrokerAddr, masterHaAddr);
+        }
+
+        // notify PullRequest on hold to pull from master.
+        if (this.minBrokerIdInGroup == MixAll.MASTER_ID) {
+            this.pullRequestHoldService.notifyMasterOnline();
+        }
+    }
+
+    @Override
+    public void updateMinBroker(long minBrokerId, String minBrokerAddr) {
+        if (lock.tryLock()) {
+            try {
+                if (minBrokerId != this.minBrokerIdInGroup) {
+                    String offlineBrokerAddr = null;
+                    if (minBrokerId > this.minBrokerIdInGroup) {
+                        offlineBrokerAddr = this.minBrokerAddrInGroup;
+                    }
+                    onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, null);
+                }
+            } finally {
+                lock.unlock();
+            }
+
+        }
+    }
+
+    @Override
+    public void updateMinBroker(long minBrokerId, String minBrokerAddr, String offlineBrokerAddr,
+        String masterHaAddr) {
+        try {
+            if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+                try {
+                    if (minBrokerId != this.minBrokerIdInGroup) {
+                        onMinBrokerChange(minBrokerId, minBrokerAddr, offlineBrokerAddr, masterHaAddr);
+                    }
+                } finally {
+                    lock.unlock();
+                }
+
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Update min broker error, {}", e);
+        }
+    }
+
+    @Override
+    public BrokerController peekMasterBroker() {
+        return this.brokerContainer.peekMasterBroker();
+    }
+}
diff --git a/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
new file mode 100644
index 0000000..d4b4a75
--- /dev/null
+++ b/container/src/main/java/org/apache/rocketmq/container/logback/BrokerLogbackConfigurator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container.logback;
+
+import ch.qos.logback.classic.AsyncAppender;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.encoder.Encoder;
+import ch.qos.logback.core.rolling.FixedWindowRollingPolicy;
+import ch.qos.logback.core.rolling.RollingFileAppender;
+import ch.qos.logback.core.rolling.RollingPolicy;
+import ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP;
+import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
+import ch.qos.logback.core.rolling.TimeBasedFileNamingAndTriggeringPolicy;
+import ch.qos.logback.core.rolling.TimeBasedRollingPolicy;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Set;
+
+import ch.qos.logback.core.util.FileSize;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.slf4j.LoggerFactory;
+
+public class BrokerLogbackConfigurator {
+    private static final InternalLogger LOG = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private static final Set<String> CONFIGURED_BROKER_LIST = new HashSet<>();
+
+    public static final String ROCKETMQ_LOGS = "rocketmqlogs";
+    public static final String ROCKETMQ_PREFIX = "Rocketmq";
+    public static final String SUFFIX_CONSOLE = "Console";
+    public static final String SUFFIX_APPENDER = "Appender";
+    public static final String SUFFIX_INNER_APPENDER = "_inner";
+
+    public static void doConfigure(BrokerConfig brokerConfig) {
+        if (!CONFIGURED_BROKER_LIST.contains(brokerConfig.getCanonicalName())) {
+            try {
+                LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+                for (ch.qos.logback.classic.Logger tempLogger : lc.getLoggerList()) {
+                    String loggerName = tempLogger.getName();
+                    if (loggerName.startsWith(ROCKETMQ_PREFIX)
+                        && !loggerName.endsWith(SUFFIX_CONSOLE)
+                        && !loggerName.equals(LoggerName.ACCOUNT_LOGGER_NAME)
+                        && !loggerName.equals(LoggerName.COMMERCIAL_LOGGER_NAME)
+                        && !loggerName.equals(LoggerName.CONSUMER_STATS_LOGGER_NAME)) {
+                        ch.qos.logback.classic.Logger logger = lc.getLogger(brokerConfig.getLoggerIdentifier() + loggerName);
+                        logger.setAdditive(tempLogger.isAdditive());
+                        logger.setLevel(tempLogger.getLevel());
+                        String appenderName = loggerName + SUFFIX_APPENDER;
+                        Appender<ILoggingEvent> tempAppender = tempLogger.getAppender(appenderName);
+                        if (tempAppender instanceof AsyncAppender) {
+                            AsyncAppender tempAsyncAppender = (AsyncAppender) tempAppender;
+                            AsyncAppender asyncAppender = new AsyncAppender();
+                            asyncAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+                            asyncAppender.setContext(tempAsyncAppender.getContext());
+
+                            String innerAppenderName = appenderName + SUFFIX_INNER_APPENDER;
+                            Appender<ILoggingEvent> tempInnerAppender = tempAsyncAppender.getAppender(innerAppenderName);
+                            if (!(tempInnerAppender instanceof RollingFileAppender)) {
+                                continue;
+                            }
+                            asyncAppender.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempInnerAppender,
+                                brokerConfig, innerAppenderName));
+                            asyncAppender.start();
+                            logger.addAppender(asyncAppender);
+                        } else if (tempAppender instanceof RollingFileAppender) {
+                            logger.addAppender(configureRollingFileAppender((RollingFileAppender<ILoggingEvent>) tempAppender,
+                                brokerConfig, appenderName));
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Configure logback for broker {} failed, will use default broker log config instead. {}", brokerConfig.getCanonicalName(), e);
+                return;
+            }
+
+            CONFIGURED_BROKER_LIST.add(brokerConfig.getCanonicalName());
+        }
+    }
+
+    private static RollingFileAppender<ILoggingEvent> configureRollingFileAppender(
+        RollingFileAppender<ILoggingEvent> tempRollingFileAppender, BrokerConfig brokerConfig, String appenderName)
+        throws NoSuchFieldException, IllegalAccessException {
+        RollingFileAppender<ILoggingEvent> rollingFileAppender = new RollingFileAppender<>();
+
+        // configure appender name
+        rollingFileAppender.setName(brokerConfig.getLoggerIdentifier() + appenderName);
+
+        // configure file name
+        rollingFileAppender.setFile(tempRollingFileAppender.getFile().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+
+        // configure append
+        rollingFileAppender.setAppend(true);
+
+        // configure prudent
+        rollingFileAppender.setPrudent(tempRollingFileAppender.isPrudent());
+
+        // configure rollingPolicy
+        RollingPolicy originalRollingPolicy = tempRollingFileAppender.getRollingPolicy();
+        if (originalRollingPolicy instanceof TimeBasedRollingPolicy) {
+            TimeBasedRollingPolicy<ILoggingEvent> tempRollingPolicy = (TimeBasedRollingPolicy<ILoggingEvent>) originalRollingPolicy;
+            TimeBasedRollingPolicy<ILoggingEvent> rollingPolicy = new TimeBasedRollingPolicy<>();
+            rollingPolicy.setContext(tempRollingPolicy.getContext());
+            rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern());
+            SizeAndTimeBasedFNATP<ILoggingEvent> sizeAndTimeBasedFNATP = new SizeAndTimeBasedFNATP<>();
+            sizeAndTimeBasedFNATP.setContext(tempRollingPolicy.getContext());
+            TimeBasedFileNamingAndTriggeringPolicy<ILoggingEvent> timeBasedFileNamingAndTriggeringPolicy =
+                tempRollingPolicy.getTimeBasedFileNamingAndTriggeringPolicy();
+            if (timeBasedFileNamingAndTriggeringPolicy instanceof SizeAndTimeBasedFNATP) {
+                SizeAndTimeBasedFNATP<ILoggingEvent> originalSizeAndTimeBasedFNATP =
+                    (SizeAndTimeBasedFNATP<ILoggingEvent>) timeBasedFileNamingAndTriggeringPolicy;
+                Field field = originalSizeAndTimeBasedFNATP.getClass().getDeclaredField("maxFileSize");
+                field.setAccessible(true);
+                sizeAndTimeBasedFNATP.setMaxFileSize((FileSize) field.get(originalSizeAndTimeBasedFNATP));
+                sizeAndTimeBasedFNATP.setTimeBasedRollingPolicy(rollingPolicy);
+            }
+            rollingPolicy.setTimeBasedFileNamingAndTriggeringPolicy(sizeAndTimeBasedFNATP);
+            rollingPolicy.setMaxHistory(tempRollingPolicy.getMaxHistory());
+            rollingPolicy.setParent(rollingFileAppender);
+            rollingPolicy.start();
+            rollingFileAppender.setRollingPolicy(rollingPolicy);
+        } else if (originalRollingPolicy instanceof FixedWindowRollingPolicy) {
+            FixedWindowRollingPolicy tempRollingPolicy = (FixedWindowRollingPolicy) originalRollingPolicy;
+            FixedWindowRollingPolicy rollingPolicy = new FixedWindowRollingPolicy();
+            rollingPolicy.setContext(tempRollingPolicy.getContext());
+            rollingPolicy.setFileNamePattern(tempRollingPolicy.getFileNamePattern().replaceAll(ROCKETMQ_LOGS, brokerConfig.getCanonicalName() + "_" + ROCKETMQ_LOGS));
+            rollingPolicy.setMaxIndex(tempRollingPolicy.getMaxIndex());
+            rollingPolicy.setMinIndex(tempRollingPolicy.getMinIndex());
+            rollingPolicy.setParent(rollingFileAppender);
+            rollingPolicy.start();
+            rollingFileAppender.setRollingPolicy(rollingPolicy);
+        }
+
+        // configure triggerPolicy
+        if (tempRollingFileAppender.getTriggeringPolicy() instanceof SizeBasedTriggeringPolicy) {
+            SizeBasedTriggeringPolicy<ILoggingEvent> tempTriggerPolicy = (SizeBasedTriggeringPolicy<ILoggingEvent>) tempRollingFileAppender.getTriggeringPolicy();
+            SizeBasedTriggeringPolicy<ILoggingEvent> triggerPolicy = new SizeBasedTriggeringPolicy<>();
+            triggerPolicy.setContext(tempTriggerPolicy.getContext());
+            Field field = triggerPolicy.getClass().getDeclaredField("maxFileSize");
+            field.setAccessible(true);
+            triggerPolicy.setMaxFileSize((FileSize) field.get(triggerPolicy));
+            triggerPolicy.start();
+            rollingFileAppender.setTriggeringPolicy(triggerPolicy);
+        }
+
+        // configure encoder
+        Encoder<ILoggingEvent> tempEncoder = tempRollingFileAppender.getEncoder();
+        if (tempEncoder instanceof PatternLayoutEncoder) {
+            PatternLayoutEncoder tempPatternLayoutEncoder = (PatternLayoutEncoder) tempEncoder;
+            PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
+            patternLayoutEncoder.setContext(tempPatternLayoutEncoder.getContext());
+            patternLayoutEncoder.setPattern(tempPatternLayoutEncoder.getPattern());
+            patternLayoutEncoder.setCharset(tempPatternLayoutEncoder.getCharset());
+            patternLayoutEncoder.start();
+
+            rollingFileAppender.setEncoder(patternLayoutEncoder);
+        }
+
+        // configure context
+        rollingFileAppender.setContext(tempRollingFileAppender.getContext());
+
+        rollingFileAppender.start();
+
+        return rollingFileAppender;
+    }
+}
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java
new file mode 100644
index 0000000..8138098
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerStartupTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.assertj.core.util.Arrays;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerContainerStartupTest {
+    private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+    private static final String BROKER_NAME_PREFIX = "TestBroker";
+    private static final String SHARED_BROKER_NAME_PREFIX = "TestBrokerContainer";
+    private static String brokerConfigPath;
+    private static String brokerContainerConfigPath;
+
+    @Mock
+    private BrokerConfig brokerConfig;
+    private String storePathRootDir = "store/test";
+    @Mock
+    private NettyClientConfig nettyClientConfig;
+    @Mock
+    private NettyServerConfig nettyServerConfig;
+
+    @Before
+    public void init() throws IOException {
+        String brokerName = BROKER_NAME_PREFIX + "_" + System.currentTimeMillis();
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setBrokerName(brokerName);
+        if (brokerConfig.getRocketmqHome() == null) {
+            brokerConfig.setRocketmqHome("../distribution");
+        }
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+
+        brokerConfigPath = "/tmp/" + brokerName;
+        brokerConfig.setBrokerConfigPath(brokerConfigPath);
+        File file = new File(brokerConfigPath);
+        TMP_FILE_LIST.add(file);
+        Properties brokerConfigProp = MixAll.object2Properties(brokerConfig);
+        Properties storeConfigProp = MixAll.object2Properties(storeConfig);
+
+        for (Object key : storeConfigProp.keySet()) {
+            brokerConfigProp.put(key, storeConfigProp.get(key));
+        }
+        MixAll.string2File(MixAll.properties2String(brokerConfigProp), brokerConfigPath);
+
+        brokerContainerConfigPath = "/tmp/" + SHARED_BROKER_NAME_PREFIX + System.currentTimeMillis();
+        BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+        brokerContainerConfig.setBrokerConfigPaths(brokerConfigPath);
+        if (brokerContainerConfig.getRocketmqHome() == null) {
+            brokerContainerConfig.setRocketmqHome("../distribution");
+        }
+        File file1 = new File(brokerContainerConfigPath);
+        TMP_FILE_LIST.add(file1);
+        Properties brokerContainerConfigProp = MixAll.object2Properties(brokerContainerConfig);
+        MixAll.string2File(MixAll.properties2String(brokerContainerConfigProp), brokerContainerConfigPath);
+    }
+
+    @After
+    public void destory() {
+        for (File file : TMP_FILE_LIST) {
+            UtilAll.deleteFile(file);
+        }
+    }
+
+    @Test
+    public void testStartBroker1() {
+        BrokerContainer brokerContainer = BrokerContainerStartup.startBrokerContainer(
+            BrokerContainerStartup.createBrokerContainer(Arrays.array("-c", brokerContainerConfigPath)));
+        assertThat(brokerContainer).isNotNull();
+        List<BrokerController> brokers = BrokerContainerStartup.createAndStartBrokers(brokerContainer);
+        assertThat(brokers.size()).isEqualTo(1);
+
+        brokerContainer.shutdown();
+        assertThat(brokerContainer.getBrokerControllers().size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testStartBroker2() {
+        InnerBrokerController brokerController = (InnerBrokerController) BrokerContainerStartup.start(BrokerContainerStartup.createBrokerController(Arrays.array("-c", brokerConfigPath)));
+        assertThat(brokerController).isNotNull();
+
+        assertThat(brokerController.getBrokerContainer().getBrokerControllers().size()).isEqualTo(1);
+
+        brokerController.getBrokerContainer().shutdown();
+        assertThat(brokerController.getBrokerContainer().getBrokerControllers().size()).isEqualTo(0);
+    }
+
+    private static File createBaseDir(String prefix) {
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+            TMP_FILE_LIST.add(file);
+            System.out.printf("create file at %s%n", file.getAbsolutePath());
+            return file;
+        } catch (IOException e) {
+            throw new RuntimeException("Couldn't create tmp folder", e);
+        }
+    }
+
+    @Before
+    public void clear() {
+        UtilAll.deleteFile(new File(storePathRootDir));
+    }
+
+    @After
+    public void tearDown() {
+        File configFile = new File(storePathRootDir);
+        UtilAll.deleteFile(configFile);
+        UtilAll.deleteEmptyDirectory(configFile);
+        UtilAll.deleteEmptyDirectory(configFile.getParentFile());
+    }
+}
\ No newline at end of file
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
new file mode 100644
index 0000000..42c934d
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerContainerTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.entry;
+import static org.mockito.Mockito.*;
+
+public class BrokerContainerTest {
+    private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+    private static final Random RANDOM = new Random();
+    private static final Set<Integer> PORTS_IN_USE = new HashSet<>();
+
+    /**
+     * Tests if the controller can be properly stopped and started.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testBrokerContainerRestart() throws Exception {
+        BrokerContainer brokerController = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(brokerController.initialize()).isTrue();
+        brokerController.start();
+        brokerController.shutdown();
+    }
+
+    @Test
+    public void testRegisterIncrementBrokerData() throws Exception {
+        BrokerController brokerController = new BrokerController(
+            new BrokerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig(),
+            new MessageStoreConfig());
+
+        BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
+        Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
+        field.setAccessible(true);
+        field.set(brokerController, brokerOuterAPI);
+
+        // topic-0 doesn't have queueGroupConfig.
+        // topic-1 has queueGroupConfig.
+        List<TopicConfig> topicConfigList = new ArrayList<>(2);
+        for (int i = 0; i < 2; i++) {
+            topicConfigList.add(new TopicConfig("topic-" + i));
+        }
+        DataVersion dataVersion = new DataVersion();
+
+        // Check normal condition.
+        testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+            topicConfigList, dataVersion, PermName.PERM_READ | PermName.PERM_WRITE, 1);
+        // Check unwritable broker.
+        testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+            topicConfigList, dataVersion, PermName.PERM_READ, 2);
+        // Check unreadable broker.
+        testRegisterIncrementBrokerDataWithPerm(brokerController, brokerOuterAPI,
+            topicConfigList, dataVersion, PermName.PERM_WRITE, 3);
+    }
+
+    @Test
+    public void testRegisterIncrementBrokerDataPerm() throws Exception {
+        BrokerController brokerController = new BrokerController(
+            new BrokerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig(),
+            new MessageStoreConfig());
+
+        BrokerOuterAPI brokerOuterAPI = mock(BrokerOuterAPI.class);
+        Field field = BrokerController.class.getDeclaredField("brokerOuterAPI");
+        field.setAccessible(true);
+        field.set(brokerController, brokerOuterAPI);
+
+        // topic-0 doesn't have queueGroupConfig.
+        // topic-1 has queueGroupConfig.
+        List<TopicConfig> topicConfigList = new ArrayList<>(2);
+        for (int i = 0; i < 2; i++) {
+            topicConfigList.add(new TopicConfig("topic-" + i));
+        }
+        DataVersion dataVersion = new DataVersion();
+
+        brokerController.getBrokerConfig().setBrokerPermission(4);
+
+        brokerController.registerIncrementBrokerData(topicConfigList, dataVersion);
+        // Get topicConfigSerializeWrapper created by registerIncrementBrokerData() from brokerOuterAPI.registerBrokerAll()
+        ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
+        verify(brokerOuterAPI).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(), anyString(),
+            captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyBoolean());
+        TopicConfigSerializeWrapper wrapper = captor.getValue();
+        for (Map.Entry<String, TopicConfig> entry : wrapper.getTopicConfigTable().entrySet()) {
+            assertThat(entry.getValue().getPerm()).isEqualTo(brokerController.getBrokerConfig().getBrokerPermission());
+        }
+
+    }
+
+    @Test
+    public void testMasterScaleOut() throws Exception {
+        BrokerContainer brokerContainer = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(brokerContainer.initialize()).isTrue();
+        brokerContainer.getBrokerContainerConfig().setNamesrvAddr("127.0.0.1:9876");
+        brokerContainer.start();
+
+        BrokerConfig masterBrokerConfig = new BrokerConfig();
+
+        String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(baseDir);
+        messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        InnerBrokerController brokerController = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+        assertThat(brokerController.isIsolated()).isFalse();
+
+        brokerContainer.shutdown();
+        brokerController.getMessageStore().destroy();
+    }
+
+    @Test
+    public void testAddMasterFailed() throws Exception {
+        BrokerContainer brokerContainer = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(brokerContainer.initialize()).isTrue();
+        brokerContainer.start();
+
+        BrokerConfig masterBrokerConfig = new BrokerConfig();
+        masterBrokerConfig.setListenPort(brokerContainer.getNettyServerConfig().getListenPort());
+        boolean exceptionCaught = false;
+        try {
+            String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+            MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+            messageStoreConfig.setStorePathRootDir(baseDir);
+            messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+            brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+        } catch (Exception e) {
+            exceptionCaught = true;
+        } finally {
+            brokerContainer.shutdown();
+
+        }
+
+        assertThat(exceptionCaught).isTrue();
+    }
+
+    @Test
+    public void testAddSlaveFailed() throws Exception {
+        BrokerContainer sharedBrokerController = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(sharedBrokerController.initialize()).isTrue();
+        sharedBrokerController.start();
+
+        BrokerConfig slaveBrokerConfig = new BrokerConfig();
+        slaveBrokerConfig.setBrokerId(1);
+        slaveBrokerConfig.setListenPort(sharedBrokerController.getNettyServerConfig().getListenPort());
+        MessageStoreConfig slaveMessageStoreConfig = new MessageStoreConfig();
+        slaveMessageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
+        String baseDir = createBaseDir("unnittest-slave").getAbsolutePath();
+        slaveMessageStoreConfig.setStorePathRootDir(baseDir);
+        slaveMessageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        boolean exceptionCaught = false;
+        try {
+            sharedBrokerController.addBroker(slaveBrokerConfig, slaveMessageStoreConfig);
+        } catch (Exception e) {
+            exceptionCaught = true;
+        } finally {
+            sharedBrokerController.shutdown();
+        }
+
+        assertThat(exceptionCaught).isTrue();
+    }
+
+    @Test
+    public void testAddAndRemoveMaster() throws Exception {
+        BrokerContainer brokerContainer = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(brokerContainer.initialize()).isTrue();
+        brokerContainer.start();
+
+        BrokerConfig masterBrokerConfig = new BrokerConfig();
+        String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(baseDir);
+        messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        InnerBrokerController master = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+        assertThat(master).isNotNull();
+        master.start();
+        assertThat(master.isIsolated()).isFalse();
+
+        brokerContainer.removeBroker(new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(), masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId()));
+        assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+        brokerContainer.shutdown();
+        master.getMessageStore().destroy();
+    }
+
+    @Test
+    public void testAddAndRemoveSlaveSuccess() throws Exception {
+        BrokerContainer brokerContainer = new BrokerContainer(
+            new BrokerContainerConfig(),
+            new NettyServerConfig(),
+            new NettyClientConfig());
+        assertThat(brokerContainer.initialize()).isTrue();
+        brokerContainer.start();
+
+        BrokerConfig masterBrokerConfig = new BrokerConfig();
+        String baseDir = createBaseDir("unnittest-master").getAbsolutePath();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setStorePathRootDir(baseDir);
+        messageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        InnerBrokerController master = brokerContainer.addBroker(masterBrokerConfig, messageStoreConfig);
+        assertThat(master).isNotNull();
+        master.start();
+        assertThat(master.isIsolated()).isFalse();
+
+        BrokerConfig slaveBrokerConfig = new BrokerConfig();
+        slaveBrokerConfig.setListenPort(generatePort(masterBrokerConfig.getListenPort(), 10000));
+        slaveBrokerConfig.setBrokerId(1);
+        MessageStoreConfig slaveMessageStoreConfig = new MessageStoreConfig();
+        slaveMessageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
+        slaveMessageStoreConfig.setHaListenPort(generatePort(messageStoreConfig.getHaListenPort(), 10000));
+        baseDir = createBaseDir("unnittest-slave").getAbsolutePath();
+        slaveMessageStoreConfig.setStorePathRootDir(baseDir);
+        slaveMessageStoreConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        InnerBrokerController slave = brokerContainer.addBroker(slaveBrokerConfig, slaveMessageStoreConfig);
+        assertThat(slave).isNotNull();
+        slave.start();
+        assertThat(slave.isIsolated()).isFalse();
+
+        brokerContainer.removeBroker(new BrokerIdentity(slaveBrokerConfig.getBrokerClusterName(), slaveBrokerConfig.getBrokerName(), slaveBrokerConfig.getBrokerId()));
+        assertThat(brokerContainer.getSlaveBrokers().size()).isEqualTo(0);
+
+        brokerContainer.removeBroker(new BrokerIdentity(masterBrokerConfig.getBrokerClusterName(), masterBrokerConfig.getBrokerName(), masterBrokerConfig.getBrokerId()));
+        assertThat(brokerContainer.getMasterBrokers().size()).isEqualTo(0);
+
+        brokerContainer.shutdown();
+        slave.getMessageStore().destroy();
+        master.getMessageStore().destroy();
+    }
+
+    private static File createBaseDir(String prefix) {
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+            TMP_FILE_LIST.add(file);
+            return file;
+        } catch (IOException e) {
+            throw new RuntimeException("Couldn't create tmp folder", e);
+        }
+    }
+
+    public static int generatePort(int base, int range) {
+        int result = base + RANDOM.nextInt(range);
+        while (PORTS_IN_USE.contains(result) || PORTS_IN_USE.contains(result - 2)) {
+            result = base + RANDOM.nextInt(range);
+        }
+        PORTS_IN_USE.add(result);
+        PORTS_IN_USE.add(result - 2);
+        return result;
+    }
+
+    @After
+    public void destory() {
+        for (File file : TMP_FILE_LIST) {
+            UtilAll.deleteFile(file);
+        }
+    }
+
+    private void testRegisterIncrementBrokerDataWithPerm(BrokerController brokerController,
+        BrokerOuterAPI brokerOuterAPI,
+        List<TopicConfig> topicConfigList, DataVersion dataVersion, int perm, int times) {
+        brokerController.getBrokerConfig().setBrokerPermission(perm);
+
+        brokerController.registerIncrementBrokerData(topicConfigList, dataVersion);
+        // Get topicConfigSerializeWrapper created by registerIncrementBrokerData() from brokerOuterAPI.registerBrokerAll()
+        ArgumentCaptor<TopicConfigSerializeWrapper> captor = ArgumentCaptor.forClass(TopicConfigSerializeWrapper.class);
+        verify(brokerOuterAPI, times(times)).registerBrokerAll(anyString(), anyString(), anyString(), anyLong(),
+            anyString(), captor.capture(), ArgumentMatchers.anyList(), anyBoolean(), anyInt(), anyBoolean(), anyBoolean(), anyBoolean());
+        TopicConfigSerializeWrapper wrapper = captor.getValue();
+
+        for (TopicConfig topicConfig : topicConfigList) {
+            topicConfig.setPerm(perm);
+        }
+        assertThat(wrapper.getDataVersion()).isEqualTo(dataVersion);
+        assertThat(wrapper.getTopicConfigTable()).containsExactly(
+            entry("topic-0", topicConfigList.get(0)),
+            entry("topic-1", topicConfigList.get(1)));
+        for (TopicConfig topicConfig : topicConfigList) {
+            topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+        }
+    }
+}
diff --git a/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
new file mode 100644
index 0000000..6943d28
--- /dev/null
+++ b/container/src/test/java/org/apache/rocketmq/container/BrokerPreOnlineTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.container;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerPreOnlineTest {
+    @Mock
+    private BrokerContainer brokerContainer;
+
+    private InnerBrokerController innerBrokerController;
+
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+
+    public void init() throws Exception {
+        when(brokerContainer.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+        when(brokerContainer.getBrokerContainerConfig()).thenReturn(new BrokerContainerConfig());
+
+        BrokerMemberGroup brokerMemberGroup1 = new BrokerMemberGroup();
+        Map<Long, String> brokerAddrMap = new HashMap<>();
+        brokerAddrMap.put(1L, "127.0.0.1:20911");
+        brokerMemberGroup1.setBrokerAddrs(brokerAddrMap);
+
+        BrokerMemberGroup brokerMemberGroup2 = new BrokerMemberGroup();
+        brokerMemberGroup2.setBrokerAddrs(new HashMap<>());
+
+//        when(brokerOuterAPI.syncBrokerMemberGroup(anyString(), anyString()))
+//            .thenReturn(brokerMemberGroup1)
+//            .thenReturn(brokerMemberGroup2);
+//        doNothing().when(brokerOuterAPI).sendBrokerHaInfo(anyString(), anyString(), anyLong(), anyString());
+
+        DefaultMessageStore defaultMessageStore = mock(DefaultMessageStore.class);
+        when(defaultMessageStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
+        when(defaultMessageStore.getBrokerConfig()).thenReturn(new BrokerConfig());
+
+//        HAService haService = new DefaultHAService();
+//        haService.init(defaultMessageStore);
+//        haService.start();
+//
+//        when(defaultMessageStore.getHaService()).thenReturn(haService);
+
+        innerBrokerController = new InnerBrokerController(brokerContainer,
+            defaultMessageStore.getBrokerConfig(),
+            defaultMessageStore.getMessageStoreConfig());
+
+        innerBrokerController.setTransactionalMessageCheckService(new TransactionalMessageCheckService(innerBrokerController));
+
+        Field field = InnerBrokerController.class.getDeclaredField("isIsolated");
+        field.setAccessible(true);
+        field.set(innerBrokerController, true);
+
+        field = BrokerController.class.getDeclaredField("messageStore");
+        field.setAccessible(true);
+        field.set(innerBrokerController, defaultMessageStore);
+    }
+
+    @Test
+    public void testMasterOnlineConnTimeout() throws Exception {
+        init();
+        BrokerPreOnlineService brokerPreOnlineService = new BrokerPreOnlineService(innerBrokerController);
+
+        brokerPreOnlineService.start();
+
+        await().atMost(Duration.ofSeconds(30)).until(() -> !innerBrokerController.isIsolated());
+    }
+}
diff --git a/pom.xml b/pom.xml
index 880df2d..5f6c4ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
         <module>logging</module>
         <module>acl</module>
         <module>example</module>
+        <module>container</module>
     </modules>
 
     <build>
@@ -460,6 +461,11 @@
             </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-container</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
                 <artifactId>rocketmq-common</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -531,7 +537,7 @@
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>
-                <version>1.2</version>
+                <version>1.4</version>
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
@@ -566,6 +572,11 @@
                 <version>3.4</version>
             </dependency>
             <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>2.6</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>19.0</version>
@@ -610,8 +621,11 @@
                 <artifactId>commons-validator</artifactId>
                 <version>1.7</version>
             </dependency>
-
-
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-core</artifactId>
+                <version>1.1.11</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
index 6292fc0..0f17858 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RPCHook.java
@@ -23,5 +23,7 @@ public interface RPCHook {
     void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
 
     void doAfterResponse(final String remoteAddr, final RemotingCommand request,
-        final RemotingCommand response);
+                         final RemotingCommand response);
+
+    void doAfterRpcFailure(final String remoteAddr, RemotingCommand request, Boolean remoteTimeout);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c0754db..9f6933d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -17,12 +17,14 @@
 package org.apache.rocketmq.remoting;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface RemotingClient extends RemotingService {
@@ -31,6 +33,8 @@ public interface RemotingClient extends RemotingService {
 
     List<String> getNameServerAddressList();
 
+    List<String> getAvailableNameSrvList();
+
     RemotingCommand invokeSync(final String addr, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException;
@@ -48,7 +52,12 @@ public interface RemotingClient extends RemotingService {
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
 
-    ExecutorService getCallbackExecutor();
-
     boolean isChannelWritable(final String addr);
+
+    void closeChannels();
+
+    void closeChannels(final List<String> addrList);
+
+    ConcurrentMap<Integer, ResponseFuture> getResponseTable();
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index a12c089..36e2035 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -36,6 +36,12 @@ public interface RemotingServer extends RemotingService {
 
     Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
 
+    Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair();
+
+    RemotingServer newRemotingServer(int port);
+
+    void removeRemotingServer(int port);
+
     RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
         RemotingTimeoutException;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
index 2f88797..c718f2e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java
@@ -23,4 +23,9 @@ public interface RemotingService {
     void shutdown();
 
     void registerRPCHook(RPCHook rpcHook);
+
+    /**
+     * Remove all rpc hooks.
+     */
+    void clearRPCHook();
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 4654e49..4c8a62a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -189,6 +189,19 @@ public class RemotingHelper {
         return "";
     }
 
+    public static String parseHostFromAddress(String address) {
+        if (address == null) {
+            return "";
+        }
+
+        String[] addressSplits = address.split(":");
+        if (addressSplits.length < 1) {
+            return "";
+        }
+
+        return addressSplits[0];
+    }
+
     public static String parseSocketAddressAddr(SocketAddress socketAddress) {
         if (socketAddress != null) {
             // Default toString of InetSocketAddress is "hostName/IP:port"
@@ -199,4 +212,29 @@ public class RemotingHelper {
         return "";
     }
 
+    public static int parseSocketAddressPort(SocketAddress socketAddress) {
+        if (socketAddress instanceof InetSocketAddress) {
+            return ((InetSocketAddress) socketAddress).getPort();
+        }
+        return -1;
+    }
+
+
+    public static int ipToInt(String ip) {
+        String[] ips = ip.split("\\.");
+        return (Integer.parseInt(ips[0]) << 24)
+            | (Integer.parseInt(ips[1]) << 16)
+            | (Integer.parseInt(ips[2]) << 8)
+            | Integer.parseInt(ips[3]);
+    }
+
+    public static boolean ipInCIDR(String ip, String cidr) {
+        int ipAddr = ipToInt(ip);
+        String[] cidrArr = cidr.split("/");
+        int netId = Integer.parseInt(cidrArr[1]);
+        int mask = 0xFFFFFFFF << (32 - netId);
+        int cidrIpAddr = ipToInt(cidrArr[0]);
+
+        return (ipAddr & mask) == (cidrIpAddr & mask);
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index d5ce20b..53301e8 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -218,13 +218,17 @@ public class RemotingUtil {
 
     public static void closeChannel(Channel channel) {
         final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
-        channel.close().addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
-                    future.isSuccess());
-            }
-        });
+        if (addrRemote == "") {
+            channel.close();
+        } else {
+            channel.close().addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
+                        future.isSuccess());
+                }
+            });
+        }
     }
 
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index c1b9345..2f123db 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -38,11 +38,17 @@ public class NettyClientConfig {
     private boolean clientPooledByteBufAllocatorEnable = false;
     private boolean clientCloseSocketIfTimeout = NettySystemConfig.clientCloseSocketIfTimeout;
 
+    private boolean preferredDirectByteBuffer = false;
+    private boolean defaultEventExecutorGroupEnable = true;
+
     private boolean useTLS;
 
     private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
     private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
 
+    private boolean disableCallbackExecutor = false;
+    private boolean disableNettyWorkerGroup = false;
+
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -154,4 +160,36 @@ public class NettyClientConfig {
     public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
         this.writeBufferHighWaterMark = writeBufferHighWaterMark;
     }
+
+    public boolean isPreferredDirectByteBuffer() {
+        return preferredDirectByteBuffer;
+    }
+
+    public void setPreferredDirectByteBuffer(final boolean preferredDirectByteBuffer) {
+        this.preferredDirectByteBuffer = preferredDirectByteBuffer;
+    }
+
+    public boolean isDefaultEventExecutorGroupEnable() {
+        return defaultEventExecutorGroupEnable;
+    }
+
+    public void setDefaultEventExecutorGroupEnable(final boolean defaultEventExecutorGroupEnable) {
+        this.defaultEventExecutorGroupEnable = defaultEventExecutorGroupEnable;
+    }
+
+    public boolean isDisableCallbackExecutor() {
+        return disableCallbackExecutor;
+    }
+
+    public void setDisableCallbackExecutor(boolean disableCallbackExecutor) {
+        this.disableCallbackExecutor = disableCallbackExecutor;
+    }
+
+    public boolean isDisableNettyWorkerGroup() {
+        return disableNettyWorkerGroup;
+    }
+
+    public void setDisableNettyWorkerGroup(boolean disableNettyWorkerGroup) {
+        this.disableNettyWorkerGroup = disableNettyWorkerGroup;
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index f02518b..c6c9178 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -88,7 +88,8 @@ public abstract class NettyRemotingAbstract {
     protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
 
     /**
-     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+     * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+     * code.
      */
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
@@ -102,7 +103,6 @@ public abstract class NettyRemotingAbstract {
      */
     protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
 
-
     static {
         NettyLogger.initNettyLogger();
     }
@@ -168,20 +168,27 @@ public abstract class NettyRemotingAbstract {
 
     protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
         if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
+            for (RPCHook rpcHook : rpcHooks) {
                 rpcHook.doBeforeRequest(addr, request);
             }
         }
     }
 
-    protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
+    public void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand response) {
         if (rpcHooks.size() > 0) {
-            for (RPCHook rpcHook: rpcHooks) {
+            for (RPCHook rpcHook : rpcHooks) {
                 rpcHook.doAfterResponse(addr, request, response);
             }
         }
     }
 
+    public void doAfterRpcFailure(String addr, RemotingCommand request, Boolean remoteTimeout) {
+        if (rpcHooks.size() > 0) {
+            for (RPCHook rpcHook : rpcHooks) {
+                rpcHook.doAfterRpcFailure(addr, request, remoteTimeout);
+            }
+        }
+    }
 
     /**
      * Process incoming request command issued by remote peer.
@@ -198,43 +205,55 @@ public abstract class NettyRemotingAbstract {
             Runnable run = new Runnable() {
                 @Override
                 public void run() {
+                    Exception exception = null;
+                    RemotingCommand response;
+
                     try {
                         String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                        doBeforeRpcHooks(remoteAddr, cmd);
-                        final RemotingResponseCallback callback = new RemotingResponseCallback() {
-                            @Override
-                            public void callback(RemotingCommand response) {
-                                doAfterRpcHooks(remoteAddr, cmd, response);
-                                if (!cmd.isOnewayRPC()) {
-                                    if (response != null) {
-                                        response.setOpaque(opaque);
-                                        response.markResponseType();
-                                        try {
-                                            ctx.writeAndFlush(response);
-                                        } catch (Throwable e) {
-                                            log.error("process request over, but response failed", e);
-                                            log.error(cmd.toString());
-                                            log.error(response.toString());
-                                        }
-                                    } else {
-                                    }
+                        try {
+                            doBeforeRpcHooks(remoteAddr, cmd);
+                        } catch (Exception e) {
+                            exception = e;
+                        }
+
+                        if (exception == null) {
+                            response = pair.getObject1().processRequest(ctx, cmd);
+                        } else {
+                            response = RemotingCommand.createResponseCommand(null);
+                            response.setCode(RemotingSysResponseCode.SYSTEM_ERROR);
+                        }
+
+                        try {
+                            doAfterRpcHooks(remoteAddr, cmd, response);
+                        } catch (Exception e) {
+                            exception = e;
+                        }
+
+                        if (exception != null) {
+                            throw exception;
+                        }
+
+                        if (!cmd.isOnewayRPC()) {
+                            if (response != null) {
+                                response.setOpaque(opaque);
+                                response.markResponseType();
+                                try {
+                                    ctx.writeAndFlush(response);
+                                } catch (Throwable e) {
+                                    log.error("process request over, but response failed", e);
+                                    log.error(cmd.toString());
+                                    log.error(response.toString());
                                 }
+                            } else {
+
                             }
-                        };
-                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
-                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
-                            processor.asyncProcessRequest(ctx, cmd, callback);
-                        } else {
-                            NettyRequestProcessor processor = pair.getObject1();
-                            RemotingCommand response = processor.processRequest(ctx, cmd);
-                            callback.callback(response);
                         }
                     } catch (Throwable e) {
                         log.error("process request exception", e);
                         log.error(cmd.toString());
 
                         if (!cmd.isOnewayRPC()) {
-                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+                            response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                 RemotingHelper.exceptionSimpleDesc(e));
                             response.setOpaque(opaque);
                             ctx.writeAndFlush(response);
@@ -344,29 +363,24 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-
-
-    /**
-     * Custom RPC hook.
-     * Just be compatible with the previous version, use getRPCHooks instead.
-     */
-    @Deprecated
-    protected RPCHook getRPCHook() {
-        if (rpcHooks.size() > 0) {
-            return rpcHooks.get(0);
-        }
-        return null;
-    }
-
     /**
      * Custom RPC hooks.
      *
      * @return RPC hooks if specified; null otherwise.
      */
-    public List<RPCHook> getRPCHooks() {
+    public List<RPCHook> getRPCHook() {
         return rpcHooks;
     }
 
+    public void registerRPCHook(RPCHook rpcHook) {
+        if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
+            rpcHooks.add(rpcHook);
+        }
+    }
+
+    public void clearRPCHook() {
+        rpcHooks.clear();
+    }
 
     /**
      * This method specifies thread pool to use while invoking callback methods.
@@ -481,6 +495,10 @@ public abstract class NettyRemotingAbstract {
                 throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
             }
         } else {
+            if (this instanceof NettyRemotingClient) {
+                NettyRemotingClient nettyRemotingClient = (NettyRemotingClient) this;
+                nettyRemotingClient.doAfterRpcFailure(RemotingHelper.parseChannelRemoteAddr(channel), request, false);
+            }
             if (timeoutMillis <= 0) {
                 throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
             } else {
@@ -513,13 +531,14 @@ public abstract class NettyRemotingAbstract {
 
     /**
      * mark the request of the specified channel as fail and to invoke fail callback immediately
+     *
      * @param channel the channel which is close already
      */
     protected void failFast(final Channel channel) {
         Iterator<Entry<Integer, ResponseFuture>> it = responseTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<Integer, ResponseFuture> entry = it.next();
-            if (entry.getValue().getProcessChannel() == channel) {
+            if (entry.getValue().getChannel() == channel) {
                 Integer opaque = entry.getKey();
                 if (opaque != null) {
                     requestFail(opaque);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index ce83769..455d5b4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
@@ -35,20 +36,26 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.CertificateException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.Map;
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +65,6 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -70,7 +76,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
 
@@ -83,18 +89,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private final Timer timer = new Timer("ClientHouseKeepingService", true);
 
     private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+    private final ConcurrentMap<String, Boolean> availableNamesrvAddrMap = new ConcurrentHashMap<String, Boolean>();
     private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
     private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
     private final Lock namesrvChannelLock = new ReentrantLock();
 
     private final ExecutorService publicExecutor;
+    private final ExecutorService scanExecutor;
 
     /**
      * Invoke the callback methods in this executor when process response.
      */
     private ExecutorService callbackExecutor;
     private final ChannelEventListener channelEventListener;
-    private DefaultEventExecutorGroup defaultEventExecutorGroup;
+    private EventExecutorGroup defaultEventExecutorGroup;
 
     public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -102,6 +110,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
     public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
         final ChannelEventListener channelEventListener) {
+        this(nettyClientConfig, channelEventListener, null, null);
+    }
+
+    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
+        final ChannelEventListener channelEventListener,
+        final EventLoopGroup eventLoopGroup,
+        final EventExecutorGroup eventExecutorGroup) {
         super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
         this.nettyClientConfig = nettyClientConfig;
         this.channelEventListener = channelEventListener;
@@ -112,7 +127,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
 
         this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
+            private final AtomicInteger threadIndex = new AtomicInteger(0);
 
             @Override
             public Thread newThread(Runnable r) {
@@ -120,23 +135,39 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             }
         });
 
-        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
-            private AtomicInteger threadIndex = new AtomicInteger(0);
+        this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(32), new ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
 
-            @Override
-            public Thread newThread(Runnable r) {
-                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, "NettyClientScan_thread_" + this.threadIndex.incrementAndGet());
+                }
             }
-        });
+        );
+
+        if (eventLoopGroup != null) {
+            this.eventLoopGroupWorker = eventLoopGroup;
+        } else {
+            this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
+                private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+                }
+            });
+        }
+        this.defaultEventExecutorGroup = eventExecutorGroup;
 
         if (nettyClientConfig.isUseTLS()) {
             try {
                 sslContext = TlsHelper.buildSslContext(true);
-                log.info("SSL enabled for client");
+                LOGGER.info("SSL enabled for client");
             } catch (IOException e) {
-                log.error("Failed to create SSLContext", e);
+                LOGGER.error("Failed to create SSLContext", e);
             } catch (CertificateException e) {
-                log.error("Failed to create SSLContext", e);
+                LOGGER.error("Failed to create SSLContext", e);
                 throw new RuntimeException("Failed to create SSLContext", e);
             }
         }
@@ -150,19 +181,19 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
     @Override
     public void start() {
-        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
-            nettyClientConfig.getClientWorkerThreads(),
-            new ThreadFactory() {
-
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
-                }
-            });
+        if (this.defaultEventExecutorGroup == null) {
+            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
+                nettyClientConfig.getClientWorkerThreads(),
+                new ThreadFactory() {
 
+                    private AtomicInteger threadIndex = new AtomicInteger(0);
 
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
+                    }
+                });
+        }
         Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, false)
@@ -174,49 +205,80 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     if (nettyClientConfig.isUseTLS()) {
                         if (null != sslContext) {
                             pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
-                            log.info("Prepend SSL handler");
+                            LOGGER.info("Prepend SSL handler");
                         } else {
-                            log.warn("Connections are insecure as SSLContext is null!");
+                            LOGGER.warn("Connections are insecure as SSLContext is null!");
                         }
                     }
-                    pipeline.addLast(
-                        defaultEventExecutorGroup,
-                        new NettyEncoder(),
-                        new NettyDecoder(),
-                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
-                        new NettyConnectManageHandler(),
+                    if (nettyClientConfig.isDefaultEventExecutorGroupEnable() && !nettyClientConfig.isDisableNettyWorkerGroup()) {
+                        ch.pipeline().addLast(defaultEventExecutorGroup);
+                    }
+                    ch.pipeline().addLast(//
+                        new NettyEncoder(), //
+                        new NettyDecoder(), //
+                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
+                        new NettyConnectManageHandler(), //
                         new NettyClientHandler());
                 }
             });
         if (nettyClientConfig.getClientSocketSndBufSize() > 0) {
-            log.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
+            LOGGER.info("client set SO_SNDBUF to {}", nettyClientConfig.getClientSocketSndBufSize());
             handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
         }
         if (nettyClientConfig.getClientSocketRcvBufSize() > 0) {
-            log.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
+            LOGGER.info("client set SO_RCVBUF to {}", nettyClientConfig.getClientSocketRcvBufSize());
             handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
         }
         if (nettyClientConfig.getWriteBufferLowWaterMark() > 0 && nettyClientConfig.getWriteBufferHighWaterMark() > 0) {
-            log.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
+            LOGGER.info("client set netty WRITE_BUFFER_WATER_MARK to {},{}",
                     nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark());
             handler.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                     nettyClientConfig.getWriteBufferLowWaterMark(), nettyClientConfig.getWriteBufferHighWaterMark()));
         }
 
+        if (nettyClientConfig.getClientSocketSndBufSize() != 0) {
+            handler.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize());
+        }
+        if (nettyClientConfig.getClientSocketRcvBufSize() != 0) {
+            handler.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
+        }
+        if (nettyClientConfig.isClientPooledByteBufAllocatorEnable()) {
+            handler.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        }
+
         this.timer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
                 try {
                     NettyRemotingClient.this.scanResponseTable();
                 } catch (Throwable e) {
-                    log.error("scanResponseTable exception", e);
+                    LOGGER.error("scanResponseTable exception", e);
                 }
             }
         }, 1000 * 3, 1000);
 
-        if (this.channelEventListener != null) {
-            this.nettyEventExecutor.start();
-        }
+//        this.timer.scheduleAtFixedRate(new TimerTask() {
+//            @Override
+//            public void run() {
+//                try {
+//                    NettyRemotingClient.this.scanChannelTablesOfNameServer();
+//                } catch (Exception e) {
+//                    LOGGER.error("scanChannelTablesOfNameServer exception", e);
+//                }
+//            }
+//        }, 1000 * 3, 10 * 1000);
+
+        this.timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    NettyRemotingClient.this.scanAvailableNameSrv();
+                } catch (Exception e) {
+                    LOGGER.error("scanAvailableNameSrv exception", e);
+                }
+            }
+        }, 1000 * 3, this.nettyClientConfig.getConnectTimeoutMillis());
+
     }
 
     @Override
@@ -224,8 +286,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         try {
             this.timer.cancel();
 
-            for (ChannelWrapper cw : this.channelTables.values()) {
-                this.closeChannel(null, cw.getChannel());
+            for (String addr : this.channelTables.keySet()) {
+                this.closeChannel(addr, this.channelTables.get(addr).getChannel());
             }
 
             this.channelTables.clear();
@@ -240,21 +302,30 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 this.defaultEventExecutorGroup.shutdownGracefully();
             }
         } catch (Exception e) {
-            log.error("NettyRemotingClient shutdown exception, ", e);
+            LOGGER.error("NettyRemotingClient shutdown exception, ", e);
         }
 
         if (this.publicExecutor != null) {
             try {
                 this.publicExecutor.shutdown();
             } catch (Exception e) {
-                log.error("NettyRemotingServer shutdown exception, ", e);
+                LOGGER.error("NettyRemotingServer shutdown exception, ", e);
+            }
+        }
+
+        if (this.scanExecutor != null) {
+            try {
+                this.scanExecutor.shutdown();
+            } catch (Exception e) {
+                LOGGER.error("NettyRemotingServer shutdown exception, ", e);
             }
         }
     }
 
     public void closeChannel(final String addr, final Channel channel) {
-        if (null == channel)
+        if (null == channel) {
             return;
+        }
 
         final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
 
@@ -264,46 +335,40 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     boolean removeItemFromTable = true;
                     final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
 
-                    log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+                    LOGGER.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
 
                     if (null == prevCW) {
-                        log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        LOGGER.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
                         removeItemFromTable = false;
                     } else if (prevCW.getChannel() != channel) {
-                        log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+                        LOGGER.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
                             addrRemote);
                         removeItemFromTable = false;
                     }
 
                     if (removeItemFromTable) {
                         this.channelTables.remove(addrRemote);
-                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
                     }
 
                     RemotingUtil.closeChannel(channel);
                 } catch (Exception e) {
-                    log.error("closeChannel: close the channel exception", e);
+                    LOGGER.error("closeChannel: close the channel exception", e);
                 } finally {
                     this.lockChannelTables.unlock();
                 }
             } else {
-                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+                LOGGER.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
             }
         } catch (InterruptedException e) {
-            log.error("closeChannel exception", e);
-        }
-    }
-
-    @Override
-    public void registerRPCHook(RPCHook rpcHook) {
-        if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
-            rpcHooks.add(rpcHook);
+            LOGGER.error("closeChannel exception", e);
         }
     }
 
     public void closeChannel(final Channel channel) {
-        if (null == channel)
+        if (null == channel) {
             return;
+        }
 
         try {
             if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
@@ -324,25 +389,25 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     }
 
                     if (null == prevCW) {
-                        log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        LOGGER.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
                         removeItemFromTable = false;
                     }
 
                     if (removeItemFromTable) {
                         this.channelTables.remove(addrRemote);
-                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
                         RemotingUtil.closeChannel(channel);
                     }
                 } catch (Exception e) {
-                    log.error("closeChannel: close the channel exception", e);
+                    LOGGER.error("closeChannel: close the channel exception", e);
                 } finally {
                     this.lockChannelTables.unlock();
                 }
             } else {
-                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+                LOGGER.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
             }
         } catch (InterruptedException e) {
-            log.error("closeChannel exception", e);
+            LOGGER.error("closeChannel exception", e);
         }
     }
 
@@ -366,11 +431,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
             if (update) {
                 Collections.shuffle(addrs);
-                log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
+                LOGGER.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
                 this.namesrvAddrList.set(addrs);
 
-                if (!addrs.contains(this.namesrvAddrChoosed.get())) {
-                    this.namesrvAddrChoosed.set(null);
+                // should close the channel if choosed addr is not exist.
+                if (this.namesrvAddrChoosed.get() != null && !addrs.contains(this.namesrvAddrChoosed.get())) {
+                    String namesrvAddr = this.namesrvAddrChoosed.get();
+                    for (String addr : this.channelTables.keySet()) {
+                        if (addr.contains(namesrvAddr)) {
+                            ChannelWrapper channelWrapper = this.channelTables.get(addr);
+                            if (channelWrapper != null) {
+                                closeChannel(channelWrapper.getChannel());
+                            }
+                        }
+                    }
                 }
             }
         }
@@ -390,17 +464,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 }
                 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+                this.updateChannelLastResponseTime(addr);
                 return response;
             } catch (RemotingSendRequestException e) {
-                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+                LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+                doAfterRpcFailure(addr, request, false);
                 this.closeChannel(addr, channel);
                 throw e;
             } catch (RemotingTimeoutException e) {
                 if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                     this.closeChannel(addr, channel);
-                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+                    LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                 }
-                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+                doAfterRpcFailure(addr, request, true);
+                LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                 throw e;
             }
         } else {
@@ -409,7 +486,54 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
     }
 
-    private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
+    @Override
+    public void closeChannels() {
+        closeChannels(new ArrayList<String>(this.channelTables.keySet()));
+    }
+
+    @Override
+    public void closeChannels(List<String> addrList) {
+        for (String addr : addrList) {
+            ChannelWrapper cw = this.channelTables.get(addr);
+            if (cw == null) {
+                continue;
+            }
+            this.closeChannel(addr, cw.getChannel());
+        }
+        interruptPullRequests(new HashSet<String>(addrList));
+    }
+
+    private void interruptPullRequests(Set<String> brokerAddrSet) {
+        for (ResponseFuture responseFuture : responseTable.values()) {
+            RemotingCommand cmd = responseFuture.getRequestCommand();
+            if (cmd == null) {
+                continue;
+            }
+            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(responseFuture.getChannel());
+            // interrupt only pull message request
+            if (brokerAddrSet.contains(remoteAddr) && (cmd.getCode() == 11 || cmd.getCode() == 361)) {
+                LOGGER.info("interrupt {}", cmd);
+                responseFuture.interrupt();
+            }
+        }
+    }
+
+    private void updateChannelLastResponseTime(final String addr) {
+        String address = addr;
+        if (address == null) {
+            address = this.namesrvAddrChoosed.get();
+        }
+        if (address == null) {
+            LOGGER.warn("[updateChannelLastResponseTime] could not find address!!");
+            return;
+        }
+        ChannelWrapper channelWrapper = this.channelTables.get(address);
+        if (channelWrapper != null && channelWrapper.isOK()) {
+            channelWrapper.updateLastResponseTime();
+        }
+    }
+
+    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
         if (null == addr) {
             return getAndCreateNameserverChannel();
         }
@@ -422,7 +546,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         return this.createChannel(addr);
     }
 
-    private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
+    private Channel getAndCreateNameserverChannel() throws InterruptedException {
         String addr = this.namesrvAddrChoosed.get();
         if (addr != null) {
             ChannelWrapper cw = this.channelTables.get(addr);
@@ -450,7 +574,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                         String newAddr = addrList.get(index);
 
                         this.namesrvAddrChoosed.set(newAddr);
-                        log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
+                        LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                         Channel channelNew = this.createChannel(newAddr);
                         if (channelNew != null) {
                             return channelNew;
@@ -458,11 +582,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     }
                     throw new RemotingConnectException(addrList.toString());
                 }
+            } catch (Exception e) {
+                LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
             } finally {
                 this.namesrvChannelLock.unlock();
             }
         } else {
-            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            LOGGER.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
         }
 
         return null;
@@ -494,30 +620,30 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
                 if (createNewConnection) {
                     ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
-                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+                    LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                     cw = new ChannelWrapper(channelFuture);
                     this.channelTables.put(addr, cw);
                 }
             } catch (Exception e) {
-                log.error("createChannel: create channel exception", e);
+                LOGGER.error("createChannel: create channel exception", e);
             } finally {
                 this.lockChannelTables.unlock();
             }
         } else {
-            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
         }
 
         if (cw != null) {
             ChannelFuture channelFuture = cw.getChannelFuture();
             if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
                 if (cw.isOK()) {
-                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+                    LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                     return cw.getChannel();
                 } else {
-                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+                    LOGGER.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString());
                 }
             } else {
-                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+                LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
                     channelFuture.toString());
             }
         }
@@ -538,9 +664,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 if (timeoutMillis < costTime) {
                     throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + addr + "] timeout");
                 }
-                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
+                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
             } catch (RemotingSendRequestException e) {
-                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+                LOGGER.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                 this.closeChannel(addr, channel);
                 throw e;
             }
@@ -559,7 +685,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 doBeforeRpcHooks(addr, request);
                 this.invokeOnewayImpl(channel, request, timeoutMillis);
             } catch (RemotingSendRequestException e) {
-                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+                LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+                doAfterRpcFailure(addr, request, false);
                 this.closeChannel(addr, channel);
                 throw e;
             }
@@ -595,12 +722,20 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     }
 
     @Override
+    public List<String> getAvailableNameSrvList() {
+        return new ArrayList<String>(this.availableNamesrvAddrMap.keySet());
+    }
+
+    @Override
     public ChannelEventListener getChannelEventListener() {
         return channelEventListener;
     }
 
     @Override
     public ExecutorService getCallbackExecutor() {
+        if (nettyClientConfig.isDisableCallbackExecutor()) {
+            return null;
+        }
         return callbackExecutor != null ? callbackExecutor : publicExecutor;
     }
 
@@ -609,11 +744,67 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         this.callbackExecutor = callbackExecutor;
     }
 
+    @Override
+    public ConcurrentMap<Integer, ResponseFuture> getResponseTable() {
+        return this.responseTable;
+    }
+
+    protected void scanChannelTablesOfNameServer() {
+        List<String> nameServerList = this.namesrvAddrList.get();
+        if (nameServerList == null) {
+            LOGGER.warn("[SCAN] Addresses of name server is empty!");
+            return;
+        }
+
+        for (String addr : this.channelTables.keySet()) {
+            ChannelWrapper channelWrapper = this.channelTables.get(addr);
+            if (channelWrapper == null) {
+                continue;
+            }
+
+            if ((System.currentTimeMillis() - channelWrapper.getLastResponseTime()) > this.nettyClientConfig.getChannelNotActiveInterval()) {
+                LOGGER.warn("[SCAN] No response after {} from name server {}, so close it!", channelWrapper.getLastResponseTime(),
+                    addr);
+                closeChannel(addr, channelWrapper.getChannel());
+            }
+        }
+    }
+
+    private void scanAvailableNameSrv() {
+        List<String> nameServerList = this.namesrvAddrList.get();
+        if (nameServerList == null) {
+            LOGGER.warn("scanAvailableNameSrv Addresses of name server is empty!");
+            return;
+        }
+
+        for (final String namesrvAddr : nameServerList) {
+            scanExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Channel channel = NettyRemotingClient.this.getAndCreateChannel(namesrvAddr);
+                        if (channel != null) {
+                            NettyRemotingClient.this.availableNamesrvAddrMap.putIfAbsent(namesrvAddr, true);
+                        } else {
+                            NettyRemotingClient.this.availableNamesrvAddrMap.remove(namesrvAddr);
+                        }
+                    } catch (Exception e) {
+                        LOGGER.error("scanAvailableNameSrv get channel of {} failed, ", namesrvAddr, e);
+                    }
+                }
+            });
+        }
+
+    }
+
     static class ChannelWrapper {
         private final ChannelFuture channelFuture;
+        // only affected by sync or async request, oneway is not included.
+        private long lastResponseTime;
 
         public ChannelWrapper(ChannelFuture channelFuture) {
             this.channelFuture = channelFuture;
+            this.lastResponseTime = System.currentTimeMillis();
         }
 
         public boolean isOK() {
@@ -631,6 +822,33 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         public ChannelFuture getChannelFuture() {
             return channelFuture;
         }
+
+        public long getLastResponseTime() {
+            return this.lastResponseTime;
+        }
+
+        public void updateLastResponseTime() {
+            this.lastResponseTime = System.currentTimeMillis();
+        }
+    }
+
+    class InvokeCallbackWrapper implements InvokeCallback {
+
+        private final InvokeCallback invokeCallback;
+        private final String addr;
+
+        public InvokeCallbackWrapper(InvokeCallback invokeCallback, String addr) {
+            this.invokeCallback = invokeCallback;
+            this.addr = addr;
+        }
+
+        @Override
+        public void operationComplete(ResponseFuture responseFuture) {
+            if (responseFuture != null && responseFuture.isSendRequestOK() && responseFuture.getResponseCommand() != null) {
+                NettyRemotingClient.this.updateChannelLastResponseTime(addr);
+            }
+            this.invokeCallback.operationComplete(responseFuture);
+        }
     }
 
     class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@@ -647,7 +865,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             ChannelPromise promise) throws Exception {
             final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
             final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
-            log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
+            LOGGER.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
 
             super.connect(ctx, remoteAddress, localAddress, promise);
 
@@ -659,7 +877,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         @Override
         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+            LOGGER.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
             closeChannel(ctx.channel());
             super.disconnect(ctx, promise);
 
@@ -671,7 +889,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         @Override
         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+            LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
             closeChannel(ctx.channel());
             super.close(ctx, promise);
             NettyRemotingClient.this.failFast(ctx.channel());
@@ -686,7 +904,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
                     final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                    log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+                    LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
                     closeChannel(ctx.channel());
                     if (NettyRemotingClient.this.channelEventListener != null) {
                         NettyRemotingClient.this
@@ -701,8 +919,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
-            log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+            LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+            LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
             closeChannel(ctx.channel());
             if (NettyRemotingClient.this.channelEventListener != null) {
                 NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 22440af..394b536 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -45,15 +45,17 @@ import java.security.cert.CertificateException;
 import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -77,6 +79,11 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     private final Timer timer = new Timer("ServerHouseKeepingService", true);
     private DefaultEventExecutorGroup defaultEventExecutorGroup;
 
+    /**
+     * NettyRemotingServer may holds multiple SubRemotingServer, each server will be stored in this container wih a
+     * ListenPort key.
+     */
+    private ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract> remotingServerTable = new ConcurrentHashMap<Integer, NettyRemotingAbstract>();
 
     private int port = 0;
 
@@ -156,6 +163,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
 
         loadSslContext();
+
+        this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);
     }
 
     public void loadSslContext() {
@@ -199,7 +208,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         ServerBootstrap childHandler =
             this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
-                .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
+                .option(ChannelOption.SO_BACKLOG, 1024)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.SO_KEEPALIVE, false)
                 .childOption(ChannelOption.TCP_NODELAY, true)
@@ -294,13 +303,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     }
 
     @Override
-    public void registerRPCHook(RPCHook rpcHook) {
-        if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
-            rpcHooks.add(rpcHook);
-        }
-    }
-
-    @Override
     public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
         ExecutorService executorThis = executor;
         if (null == executor) {
@@ -327,6 +329,28 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
     }
 
     @Override
+    public Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair() {
+        return defaultRequestProcessor;
+    }
+
+    @Override
+    public RemotingServer newRemotingServer(final int port) {
+        SubRemotingServer remotingServer = new SubRemotingServer(port,
+            this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+            this.nettyServerConfig.getServerAsyncSemaphoreValue());
+        NettyRemotingAbstract existingServer = this.remotingServerTable.putIfAbsent(port, remotingServer);
+        if (existingServer != null) {
+            throw new RuntimeException("The port " + port + " already in use by another RemotingServer");
+        }
+        return remotingServer;
+    }
+
+    @Override
+    public void removeRemotingServer(final int port) {
+        this.remotingServerTable.remove(port);
+    }
+
+    @Override
     public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
         throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
         return this.invokeSyncImpl(channel, request, timeoutMillis);
@@ -349,7 +373,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         return channelEventListener;
     }
 
-
     @Override
     public ExecutorService getCallbackExecutor() {
         return this.publicExecutor;
@@ -430,7 +453,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
-            processMessageReceived(ctx, msg);
+            int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
+            NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
+            if (localPort != -1 && remotingAbstract != null) {
+                remotingAbstract.processMessageReceived(ctx, msg);
+                return;
+            }
+            // The related remoting server has been shutdown, so close the connected channel
+            RemotingUtil.closeChannel(ctx.channel());
         }
     }
 
@@ -503,4 +533,110 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             RemotingUtil.closeChannel(ctx.channel());
         }
     }
+
+    /**
+     * The NettyRemotingServer supports bind multiple ports, each port bound by a SubRemotingServer. The
+     * SubRemotingServer will delegate all the functions to NettyRemotingServer, so the sub server can share all the
+     * resources from its parent server.
+     */
+    class SubRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+        private final int listenPort;
+        private volatile Channel serverChannel;
+
+        SubRemotingServer(final int port, final int permitsOnway, final int permitsAsync) {
+            super(permitsOnway, permitsAsync);
+            listenPort = port;
+        }
+
+        @Override
+        public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+            final ExecutorService executor) {
+            ExecutorService executorThis = executor;
+            if (null == executor) {
+                executorThis = NettyRemotingServer.this.publicExecutor;
+            }
+
+            Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+            this.processorTable.put(requestCode, pair);
+        }
+
+        @Override
+        public void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor) {
+            this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+        }
+
+        @Override
+        public int localListenPort() {
+            return listenPort;
+        }
+
+        @Override
+        public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode) {
+            return this.processorTable.get(requestCode);
+        }
+
+        @Override
+        public Pair<NettyRequestProcessor, ExecutorService> getDefaultProcessorPair() {
+            return this.defaultRequestProcessor;
+        }
+
+        @Override
+        public RemotingServer newRemotingServer(final int port) {
+            throw new UnsupportedOperationException("The SubRemotingServer of NettyRemotingServer " +
+                "doesn't support new nested RemotingServer");
+        }
+
+        @Override
+        public void removeRemotingServer(final int port) {
+            throw new UnsupportedOperationException("The SubRemotingServer of NettyRemotingServer " +
+                "doesn't support remove nested RemotingServer");
+        }
+
+        @Override
+        public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
+            final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+            return this.invokeSyncImpl(channel, request, timeoutMillis);
+        }
+
+        @Override
+        public void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+            final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+            this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+        }
+
+        @Override
+        public void invokeOneway(final Channel channel, final RemotingCommand request,
+            final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+            this.invokeOnewayImpl(channel, request, timeoutMillis);
+        }
+
+        @Override
+        public void start() {
+            try {
+                this.serverChannel = NettyRemotingServer.this.serverBootstrap.bind(listenPort).sync().channel();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("this.subRemotingServer.serverBootstrap.bind().sync() InterruptedException", e);
+            }
+        }
+
+        @Override
+        public void shutdown() {
+            if (this.serverChannel != null) {
+                try {
+                    this.serverChannel.close().await(5, TimeUnit.SECONDS);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
+
+        @Override
+        public ChannelEventListener getChannelEventListener() {
+            return NettyRemotingServer.this.getChannelEventListener();
+        }
+
+        @Override
+        public ExecutorService getCallbackExecutor() {
+            return NettyRemotingServer.this.getCallbackExecutor();
+        }
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
index 4800689..040f768 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -27,5 +27,4 @@ public interface NettyRequestProcessor {
         throws Exception;
 
     boolean rejectRequest();
-
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 5f4c8c6..19f705d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -25,8 +25,9 @@ import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ResponseFuture {
+    private final Channel channel;
     private final int opaque;
-    private final Channel processChannel;
+    private final RemotingCommand request;
     private final long timeoutMillis;
     private final InvokeCallback invokeCallback;
     private final long beginTimestamp = System.currentTimeMillis();
@@ -38,11 +39,18 @@ public class ResponseFuture {
     private volatile RemotingCommand responseCommand;
     private volatile boolean sendRequestOK = true;
     private volatile Throwable cause;
+    private volatile boolean interrupted = false;
 
     public ResponseFuture(Channel channel, int opaque, long timeoutMillis, InvokeCallback invokeCallback,
-        SemaphoreReleaseOnlyOnce once) {
+                          SemaphoreReleaseOnlyOnce once) {
+        this(channel, opaque, null, timeoutMillis, invokeCallback, once);
+    }
+
+    public ResponseFuture(Channel channel, int opaque, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback,
+                          SemaphoreReleaseOnlyOnce once) {
+        this.channel = channel;
         this.opaque = opaque;
-        this.processChannel = channel;
+        this.request = request;
         this.timeoutMillis = timeoutMillis;
         this.invokeCallback = invokeCallback;
         this.once = once;
@@ -56,6 +64,11 @@ public class ResponseFuture {
         }
     }
 
+    public void interrupt() {
+        interrupted = true;
+        executeInvokeCallback();
+    }
+
     public void release() {
         if (this.once != null) {
             this.once.release();
@@ -117,20 +130,23 @@ public class ResponseFuture {
         return opaque;
     }
 
-    public Channel getProcessChannel() {
-        return processChannel;
+    public RemotingCommand getRequestCommand() {
+        return request;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public boolean isInterrupted() {
+        return interrupted;
     }
 
     @Override
     public String toString() {
-        return "ResponseFuture [responseCommand=" + responseCommand
-            + ", sendRequestOK=" + sendRequestOK
-            + ", cause=" + cause
-            + ", opaque=" + opaque
-            + ", processChannel=" + processChannel
-            + ", timeoutMillis=" + timeoutMillis
-            + ", invokeCallback=" + invokeCallback
-            + ", beginTimestamp=" + beginTimestamp
+        return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
+            + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+            + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
             + ", countDownLatch=" + countDownLatch + "]";
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index a9e8415..7d614cf 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -123,8 +123,7 @@ public class RemotingCommand {
         return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
     }
 
-    public static RemotingCommand buildErrorResponse(int code, String remark,
-        Class<? extends CommandCustomHeader> classHeader) {
+    public static RemotingCommand buildErrorResponse(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(classHeader);
         response.setCode(code);
         response.setRemark(remark);
@@ -256,9 +255,8 @@ public class RemotingCommand {
         this.customHeader = customHeader;
     }
 
-    public CommandCustomHeader decodeCommandCustomHeader(
-        Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
-        CommandCustomHeader objectHeader;
+    public <T extends CommandCustomHeader> T decodeCommandCustomHeader(Class<T> classHeader) throws RemotingCommandException {
+        T objectHeader;
         try {
             objectHeader = classHeader.newInstance();
         } catch (InstantiationException e) {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index e378a7b..a538d4b 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -43,15 +43,14 @@ public class RemotingServerTest {
     public static RemotingServer createRemotingServer() throws InterruptedException {
         NettyServerConfig config = new NettyServerConfig();
         RemotingServer remotingServer = new NettyRemotingServer(config);
-        remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {
+        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
             @Override
             public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
                 request.setRemark("Hi " + ctx.channel().remoteAddress());
                 return request;
             }
 
-            @Override
-            public boolean rejectRequest() {
+            @Override public boolean rejectRequest() {
                 return false;
             }
         }, Executors.newCachedThreadPool());
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java
new file mode 100644
index 0000000..c3f4596
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/SubRemotingServerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class SubRemotingServerTest {
+    private static final int subServerPort = 1234;
+
+    private static RemotingServer remotingServer;
+    private static RemotingClient remotingClient;
+    private static RemotingServer subServer;
+
+    @BeforeClass
+    public static void setup() throws InterruptedException {
+        remotingServer = RemotingServerTest.createRemotingServer();
+        remotingClient = RemotingServerTest.createRemotingClient();
+        subServer = createSubRemotingServer(remotingServer);
+    }
+
+    @AfterClass
+    public static void destroy() {
+        remotingClient.shutdown();
+        remotingServer.shutdown();
+    }
+
+    public static RemotingServer createSubRemotingServer(RemotingServer parentServer) {
+        RemotingServer subServer = parentServer.newRemotingServer(subServerPort);
+        subServer.registerProcessor(1, new NettyRequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+                final RemotingCommand request) throws Exception {
+                request.setRemark(String.valueOf(RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress())));
+                return request;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, null);
+        subServer.start();
+        return subServer;
+    }
+
+    @Test
+    public void testInvokeSubRemotingServer() throws InterruptedException, RemotingTimeoutException, RemotingConnectException, RemotingSendRequestException {
+        RequestHeader requestHeader = new RequestHeader();
+        requestHeader.setCount(1);
+        requestHeader.setMessageTitle("Welcome");
+
+        // Parent remoting server doesn't support RequestCode 1
+        RemotingCommand request = RemotingCommand.createRequestCommand(1, requestHeader);
+        RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+
+        // Issue request to SubRemotingServer
+        response = remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+        assertThat(response).isNotNull();
+        assertThat(response.getExtFields()).hasSize(2);
+        assertThat(response.getRemark()).isEqualTo(String.valueOf(subServerPort));
+
+        // Issue unsupported request to SubRemotingServer
+        request.setCode(0);
+        response = remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+        assertThat(response).isNotNull();
+        assertThat(response.getCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+
+        // Issue request to a closed SubRemotingServer
+        request.setCode(1);
+        remotingServer.removeRemotingServer(subServerPort);
+        subServer.shutdown();
+        try {
+            remotingClient.invokeSync("localhost:1234", request, 1000 * 3);
+            failBecauseExceptionWasNotThrown(RemotingTimeoutException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(RemotingTimeoutException.class);
+        }
+    }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index a272d21..58aac7d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -37,7 +37,7 @@ public class NettyRemotingAbstractTest {
     @Test
     public void testProcessResponseCommand() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
             @Override
             public void operationComplete(final ResponseFuture responseFuture) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
@@ -58,7 +58,7 @@ public class NettyRemotingAbstractTest {
     @Test
     public void testProcessResponseCommand_NullCallBack() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, null,
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, null,
             new SemaphoreReleaseOnlyOnce(semaphore));
 
         remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -73,7 +73,7 @@ public class NettyRemotingAbstractTest {
     @Test
     public void testProcessResponseCommand_RunCallBackInCurrentThread() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
             @Override
             public void operationComplete(final ResponseFuture responseFuture) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index ee6451d..f7ef585 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.alibaba.fastjson.JSON;
+import java.util.HashSet;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;