You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/14 06:52:27 UTC

[rocketmq] 06/07: feature(test):[RIP-31]Add ITs for BrokerContainer (#3980)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 00d2d1c40c622d267b20af1c73bfd37ff74f62fd
Author: rongtong <ji...@163.com>
AuthorDate: Mon Mar 14 14:07:18 2022 +0800

    feature(test):[RIP-31]Add ITs for BrokerContainer (#3980)
---
 test/pom.xml                                       |   4 +
 .../rocketmq/test/util/MQAdminTestUtils.java       |   2 +-
 .../org/apache/rocketmq/test/base/BaseConf.java    |  26 +-
 .../test/client/producer/batch/BatchSendIT.java    |   2 +-
 .../client/producer/order/OrderMsgRebalanceIT.java |   2 +-
 .../test/container/AddAndRemoveBrokerIT.java       |  81 +++
 .../rocketmq/test/container/BrokerFailoverIT.java  |  89 +++
 .../test/container/BrokerMemberGroupIT.java        |  69 +++
 .../container/ContainerIntegrationTestBase.java    | 666 +++++++++++++++++++++
 .../test/container/GetMaxOffsetFromSlaveIT.java    |  98 +++
 .../test/container/GetMetadataReverseIT.java       | 231 +++++++
 .../test/container/PullMultipleReplicasIT.java     | 199 ++++++
 .../test/container/PushMultipleReplicasIT.java     | 112 ++++
 .../test/container/RebalanceLockOnSlaveIT.java     | 208 +++++++
 .../container/ScheduleSlaveActingMasterIT.java     | 220 +++++++
 .../test/container/ScheduledMessageIT.java         | 152 +++++
 .../test/container/SendMultipleReplicasIT.java     | 157 +++++
 .../rocketmq/test/container/SlaveBrokerIT.java     | 115 ++++
 .../test/container/SyncConsumerOffsetIT.java       | 146 +++++
 .../rocketmq/test/statictopic/StaticTopicIT.java   |  15 +-
 20 files changed, 2568 insertions(+), 26 deletions(-)

diff --git a/test/pom.xml b/test/pom.xml
index 2bdab7c..3cf4f2d 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -41,6 +41,10 @@
             <artifactId>rocketmq-namesrv</artifactId>
         </dependency>
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-container</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.truth</groupId>
             <artifactId>truth</artifactId>
             <version>0.30</version>
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 20149d4..7f6a2b6 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -140,7 +140,7 @@ public class MQAdminTestUtils {
         if (clusterInfo == null) {
             return false;
         } else {
-            HashMap<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable();
+            Map<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable();
             for (String brokerName : brokers.keySet()) {
                 HashMap<Long, String> brokerIps = brokers.get(brokerName).getBrokerAddrs();
                 for (long brokerId : brokerIps.keySet()) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index c523fd9..8420fdd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -96,17 +96,17 @@ public class BaseConf {
     }
 
     // This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
-    public static void waitBrokerRegistered(final String nsAddr, final String clusterName,
-        final int expectedBrokerNum) {
+    public static void waitBrokerRegistered(final String nsAddr, final String clusterName, final int expectedBrokerNum) {
         final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
         mqAdminExt.setNamesrvAddr(nsAddr);
         try {
             mqAdminExt.start();
+            Thread.sleep(10000);
             await().atMost(30, TimeUnit.SECONDS).until(() -> {
                 List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
                 return brokerDatas.size() == expectedBrokerNum;
             });
-            for (BrokerController brokerController : brokerControllerList) {
+            for (BrokerController brokerController: brokerControllerList) {
                 brokerController.getBrokerOuterAPI().refreshMetadata();
             }
         } catch (Exception e) {
@@ -144,6 +144,7 @@ public class BaseConf {
         return mqAdminExt;
     }
 
+
     public static RMQNormalProducer getProducer(String nsAddr, String topic) {
         return getProducer(nsAddr, topic, false);
     }
@@ -157,8 +158,7 @@ public class BaseConf {
         return producer;
     }
 
-    public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic,
-        TransactionListener transactionListener) {
+    public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
         RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
         if (debug) {
             producer.setDebug();
@@ -168,9 +168,9 @@ public class BaseConf {
     }
 
     public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
-        String instanceName) {
+                                                String instanceName) {
         RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
-            instanceName);
+                instanceName);
         if (debug) {
             producer.setDebug();
         }
@@ -188,31 +188,31 @@ public class BaseConf {
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
-        AbstractListener listener) {
+                                                AbstractListener listener) {
         return getConsumer(nsAddr, topic, subExpression, listener, false);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
-        AbstractListener listener, boolean useTLS) {
+                                                AbstractListener listener, boolean useTLS) {
         String consumerGroup = initConsumerGroup();
         return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
-        String subExpression, AbstractListener listener) {
+                                                String subExpression, AbstractListener listener) {
         return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
-        String subExpression, AbstractListener listener, boolean useTLS) {
+                                                String subExpression, AbstractListener listener, boolean useTLS) {
         RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
-            topic, subExpression, listener, useTLS);
+                topic, subExpression, listener, useTLS);
         if (debug) {
             consumer.setDebug();
         }
         mqClients.add(consumer);
         log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
-            topic, subExpression));
+                topic, subExpression));
         return consumer;
     }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
index 3a649ed..283dcbe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -137,6 +137,7 @@ public class BatchSendIT extends BaseConf {
         Thread.sleep(300);
         {
             DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(nsAddr, "group");
+            System.out.println(defaultMQPullConsumer.maxOffset(messageQueue));
 
             PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", 5, batchCount * batchNum);
             Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
@@ -181,7 +182,6 @@ public class BatchSendIT extends BaseConf {
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
         MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
 
-
         int batchCount = 10;
         int batchNum = 10;
         for (int i = 0; i < batchCount; i++) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
index bae5397..eff70a0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
@@ -81,7 +81,7 @@ public class OrderMsgRebalanceIT extends BaseConf {
     }
 
     @Test
-    public void testFourConsuemrBalance() {
+    public void testFourConsumerBalance() {
         int msgSize = 20;
         RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
         RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java b/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java
new file mode 100644
index 0000000..c19a787
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AddAndRemoveBrokerIT extends ContainerIntegrationTestBase {
+    private static BrokerContainer brokerContainer4;
+
+    @BeforeClass
+    public static void beforeClass() {
+        brokerContainer4 = createAndStartBrokerContainer(nsAddr);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        brokerContainer4.shutdown();
+    }
+
+    @Test
+    public void addBrokerTest()
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException {
+        String remark = null;
+        int code = 0;
+        try {
+            defaultMQAdminExt.addBrokerToContainer(brokerContainer4.getBrokerContainerAddr(), "");
+        } catch (MQBrokerException e) {
+            code = e.getResponseCode();
+            remark = e.getErrorMessage();
+        }
+        assertThat(code).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        assertThat(remark).isEqualTo("addBroker properties empty");
+    }
+
+    @Test
+    public void removeBrokerTest()
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException{
+
+        boolean exceptionCaught = false;
+
+        try {
+            defaultMQAdminExt.removeBrokerFromContainer(brokerContainer1.getBrokerContainerAddr(),
+                master3With3Replicas.getBrokerConfig().getBrokerClusterName(),
+                master3With3Replicas.getBrokerConfig().getBrokerName(), 1);
+        } catch (MQBrokerException e) {
+            exceptionCaught = true;
+        }
+
+        assertThat(exceptionCaught).isFalse();
+        assertThat(brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
+
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+        awaitUntilSlaveOK();
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java b/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java
new file mode 100644
index 0000000..b4e4baf
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class BrokerFailoverIT extends ContainerIntegrationTestBase {
+
+    @Test
+    public void testBrokerFailoverWithoutCompatible() {
+        changeCompatibleMode(false);
+        awaitUntilSlaveOK();
+        testBrokerFailover(false);
+    }
+
+    @Test
+    public void testBrokerFailoverWithCompatible() {
+        changeCompatibleMode(true);
+        awaitUntilSlaveOK();
+        testBrokerFailover(true);
+    }
+
+    private void testBrokerFailover(boolean compatibleMode) {
+        await().atMost(Duration.ofSeconds(10)).until(() ->
+            master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+                && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+                && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+
+        InnerSalveBrokerController targetSlave = getSlaveFromContainerByName(brokerContainer2, master1With3Replicas.getBrokerConfig().getBrokerName());
+
+        assertThat(targetSlave).isNotNull();
+
+        brokerContainer1.registerClientRPCHook(new RPCHook() {
+            @Override
+            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+                if (request.getCode() == (compatibleMode ? RequestCode.QUERY_DATA_VERSION : RequestCode.BROKER_HEARTBEAT)) {
+                    request.setCode(-1);
+                }
+            }
+
+            @Override
+            public void doAfterResponse(String remoteAddr, RemotingCommand request,
+                RemotingCommand response) {
+
+            }
+
+            @Override
+            public void doAfterRpcFailure(String remoteAddr, RemotingCommand request, Boolean remoteTimeout) {
+
+            }
+        });
+
+        InnerSalveBrokerController finalTargetSlave = targetSlave;
+        await().atMost(Duration.ofSeconds(60)).until(() ->
+            finalTargetSlave.getMessageStore().getAliveReplicaNumInGroup() == 2
+                && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 2
+                && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 2);
+
+        brokerContainer1.clearClientRPCHook();
+
+        await().atMost(Duration.ofSeconds(60)).until(() ->
+            master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+                && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+                && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java b/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java
new file mode 100644
index 0000000..ec9b6b1
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class BrokerMemberGroupIT extends ContainerIntegrationTestBase {
+    @Test
+    public void testSyncBrokerMemberGroup() throws Exception {
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            final BrokerConfig brokerConfig = master1With3Replicas.getBrokerConfig();
+            final BrokerMemberGroup memberGroup = master1With3Replicas.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+            return memberGroup.getBrokerAddrs().size() == 3;
+        });
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            final BrokerConfig brokerConfig = master3With3Replicas.getBrokerConfig();
+            final BrokerMemberGroup memberGroup = master3With3Replicas.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+            return memberGroup.getBrokerAddrs().size() == 3;
+        });
+
+        removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+        removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            final BrokerConfig brokerConfig = master1With3Replicas.getBrokerConfig();
+            final BrokerMemberGroup memberGroup = master1With3Replicas.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+            return memberGroup.getBrokerAddrs().size() == 2 && memberGroup.getBrokerAddrs().get(1L) == null;
+        });
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            final BrokerConfig brokerConfig = master3With3Replicas.getBrokerConfig();
+            final BrokerMemberGroup memberGroup = master3With3Replicas.getBrokerOuterAPI()
+                .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+            return memberGroup.getBrokerAddrs().size() == 2 && memberGroup.getBrokerAddrs().get(1L) == null;
+        });
+
+        createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+
+        awaitUntilSlaveOK();
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
new file mode 100644
index 0000000..fab4c46
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.container.BrokerContainerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * ContainerIntegrationTestBase will setup a rocketmq ha cluster contains two broker group:
+ * <li>BrokerA contains two replicas</li>
+ * <li>BrokerB contains three replicas</li>
+ */
+public class ContainerIntegrationTestBase {
+    private static final AtomicBoolean CLUSTER_SET_UP = new AtomicBoolean(false);
+    private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+    private static final Random RANDOM = new Random();
+    protected static String nsAddr;
+
+    protected static final String THREE_REPLICAS_TOPIC = "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS";
+
+    protected static final List<BrokerContainer> brokerContainerList = new ArrayList<>();
+    protected static final List<NamesrvController> namesrvControllers = new ArrayList<>();
+
+    protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+    protected static final int COMMIT_LOG_SIZE = 128 * 1024;
+    protected static final int INDEX_NUM = 1000;
+    protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+
+    protected static BrokerContainer brokerContainer1;
+    protected static BrokerContainer brokerContainer2;
+    protected static BrokerContainer brokerContainer3;
+    protected static BrokerController master1With3Replicas;
+    protected static BrokerController master2With3Replicas;
+    protected static BrokerController master3With3Replicas;
+    protected static NamesrvController namesrvController;
+
+    protected static DefaultMQAdminExt defaultMQAdminExt;
+
+    private final static InternalLogger LOG = InternalLoggerFactory.getLogger(ContainerIntegrationTestBase.class);
+    private static ConcurrentMap<BrokerConfig, MessageStoreConfig> slaveStoreConfigCache = new ConcurrentHashMap<>();
+
+    protected static ConcurrentMap<BrokerConfigLite, BrokerController> isolatedBrokers = new ConcurrentHashMap<>();
+    private static final Set<Integer> PORTS_IN_USE = new HashSet<>();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        if (CLUSTER_SET_UP.compareAndSet(false, true)) {
+            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+            System.setProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.99");
+            System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.99");
+
+            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            JoranConfigurator configurator = new JoranConfigurator();
+            configurator.setContext(lc);
+            lc.reset();
+            //https://logback.qos.ch/manual/configuration.html
+            lc.setPackagingDataEnabled(false);
+
+            configurator.doConfigure("../distribution/conf/logback_broker.xml");
+            configurator.doConfigure("../distribution/conf/logback_namesrv.xml");
+
+            setUpCluster();
+            setUpTopic();
+            registerCleaner();
+
+            System.out.printf("cluster setup complete%n");
+        }
+    }
+
+    private static void setUpTopic() {
+        createTopic(THREE_REPLICAS_TOPIC);
+    }
+
+    private static void createTopic(String topic) {
+        createTopicTo(master1With3Replicas, topic);
+        createTopicTo(master2With3Replicas, topic);
+        createTopicTo(master3With3Replicas, topic);
+    }
+
+    private static void setUpCluster() throws Exception {
+        namesrvController = createAndStartNamesrv();
+        nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+        System.out.printf("namesrv addr: %s%n", nsAddr);
+
+        /*
+         *     BrokerContainer1      |      BrokerContainer2      |      BrokerContainer3
+         *
+         *   master1With3Replicas(m)      master2With3Replicas(m)      master3With3Replicas(m)
+         *   master3With3Replicas(s0)     master1With3Replicas(s0)     master2With3Replicas(s0)
+         *   master2With3Replicas(s1)     master3With3Replicas(s1)     master1With3Replicas(s1)
+         */
+
+        brokerContainer1 = createAndStartBrokerContainer(nsAddr);
+        brokerContainer2 = createAndStartBrokerContainer(nsAddr);
+        brokerContainer3 = createAndStartBrokerContainer(nsAddr);
+        // Create three broker groups, two contains two replicas, another contains three replicas
+        master1With3Replicas = createAndAddMaster(brokerContainer1, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+        master2With3Replicas = createAndAddMaster(brokerContainer2, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+        master3With3Replicas = createAndAddMaster(brokerContainer3, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+        createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+        createAndAddSlave(1, brokerContainer3, master2With3Replicas);
+
+        createAndAddSlave(2, brokerContainer1, master2With3Replicas);
+        createAndAddSlave(2, brokerContainer2, master3With3Replicas);
+        createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+
+        awaitUntilSlaveOK();
+
+        defaultMQAdminExt = new DefaultMQAdminExt("HATest_Admin_Group");
+        defaultMQAdminExt.setNamesrvAddr(nsAddr);
+        defaultMQAdminExt.start();
+    }
+
+    protected static void createTopicTo(BrokerController masterBroker, String topicName, int rqn, int wqn) {
+        try {
+            TopicConfig topicConfig = new TopicConfig(topicName, rqn, wqn, 6, 0);
+            defaultMQAdminExt.createAndUpdateTopicConfig(masterBroker.getBrokerAddr(), topicConfig);
+
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer1);
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer2);
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer3);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("Create topic to broker failed", e);
+        }
+    }
+
+    protected static void createGroup(BrokerController masterBroker, String groupName) {
+        try {
+            SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+            config.setGroupName(groupName);
+
+            masterBroker.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
+
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer1);
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer2);
+            triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer3);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("Create group to broker failed", e);
+        }
+    }
+
+    private static void triggerSlaveSync(String brokerName, BrokerContainer brokerContainer) {
+        for (InnerSalveBrokerController slaveBroker : brokerContainer.getSlaveBrokers()) {
+            if (slaveBroker.getBrokerConfig().getBrokerName().equals(brokerName)) {
+                slaveBroker.getSlaveSynchronize().syncAll();
+                slaveBroker.registerBrokerAll(true, false, true);
+            }
+        }
+    }
+
+    protected static void createTopicTo(BrokerController brokerController, String topicName) {
+        createTopicTo(brokerController, topicName, 8, 8);
+    }
+
+    private static void registerCleaner() {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            if (CLUSTER_SET_UP.compareAndSet(true, false)) {
+                System.out.printf("clean up%n");
+                defaultMQAdminExt.shutdown();
+
+                for (final BrokerContainer brokerContainer : brokerContainerList) {
+                    brokerContainer.shutdown();
+                    for (BrokerController brokerController : brokerContainer.getBrokerControllers()) {
+                        brokerController.getMessageStore().destroy();
+                    }
+                }
+
+                for (final NamesrvController namesrvController : namesrvControllers) {
+                    namesrvController.shutdown();
+                }
+
+                for (final File file : TMP_FILE_LIST) {
+                    UtilAll.deleteFile(file);
+                }
+            }
+        }));
+    }
+
+    private static File createBaseDir(String prefix) {
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+            TMP_FILE_LIST.add(file);
+            System.out.printf("create file at %s%n", file.getAbsolutePath());
+            return file;
+        } catch (IOException e) {
+            throw new RuntimeException("Couldn't create tmp folder", e);
+        }
+    }
+
+    public static NamesrvController createAndStartNamesrv() {
+        String baseDir = createBaseDir("test-cluster-namesrv").getAbsolutePath();
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+        namesrvConfig.setKvConfigPath(baseDir + File.separator + "namesrv" + File.separator + "kvConfig.json");
+        namesrvConfig.setConfigStorePath(baseDir + File.separator + "namesrv" + File.separator + "namesrv.properties");
+        namesrvConfig.setSupportActingMaster(true);
+        namesrvConfig.setScanNotActiveBrokerInterval(1000);
+
+        nameServerNettyServerConfig.setListenPort(generatePort(10000, 10000));
+        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+        try {
+            Assert.assertTrue(namesrvController.initialize());
+            LOG.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+            namesrvController.start();
+        } catch (Exception e) {
+            LOG.info("Name Server start failed");
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+        namesrvController.getRemotingServer().registerProcessor(RequestCode.REGISTER_BROKER, new NettyRequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+                final RemotingCommand request) throws Exception {
+                final RegisterBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+                final BrokerConfigLite liteConfig = new BrokerConfigLite(requestHeader.getClusterName(),
+                    requestHeader.getBrokerName(),
+                    requestHeader.getBrokerAddr(),
+                    requestHeader.getBrokerId());
+                if (isolatedBrokers.containsKey(liteConfig)) {
+                    // return response with SYSTEM_ERROR
+                    return RemotingCommand.createResponseCommand(null);
+                }
+                return namesrvController.getRemotingServer().getDefaultProcessorPair().getObject1().processRequest(ctx, request);
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, null);
+
+        namesrvControllers.add(namesrvController);
+        return namesrvController;
+
+    }
+
+    public static BrokerContainer createAndStartBrokerContainer(String nsAddr) {
+        BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        brokerContainerConfig.setNamesrvAddr(nsAddr);
+        brokerContainerConfig.setCompatibleWithOldNameSrv(false);
+
+        nettyServerConfig.setListenPort(generatePort(20000, 10000));
+        BrokerContainer brokerContainer = new BrokerContainer(brokerContainerConfig, nettyServerConfig, nettyClientConfig);
+        try {
+            Assert.assertTrue(brokerContainer.initialize());
+            LOG.info("Broker container Start, listen on {}.", nettyServerConfig.getListenPort());
+            brokerContainer.start();
+        } catch (Exception e) {
+            LOG.info("Broker container start failed", e);
+            e.printStackTrace();
+            System.exit(1);
+        }
+        brokerContainerList.add(brokerContainer);
+        return brokerContainer;
+    }
+
+    private static int generatePort(int base, int range) {
+        int result = base + RANDOM.nextInt(range);
+        while (PORTS_IN_USE.contains(result) || PORTS_IN_USE.contains(result - 2)) {
+            result = base + RANDOM.nextInt(range);
+        }
+        PORTS_IN_USE.add(result);
+        PORTS_IN_USE.add(result - 2);
+        return result;
+    }
+
+    public static BrokerController createAndAddMaster(BrokerContainer brokerContainer,
+        BrokerGroupConfig brokerGroupConfig, int brokerIndex) throws Exception {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + brokerIndex);
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        brokerConfig.setBrokerIP2("127.0.0.1");
+        brokerConfig.setBrokerId(0);
+        brokerConfig.setEnablePropertyFilter(true);
+        brokerConfig.setEnableSlaveActingMaster(brokerGroupConfig.enableSlaveActingMaster);
+        brokerConfig.setEnableRemoteEscape(brokerGroupConfig.enableRemoteEscape);
+        brokerConfig.setSlaveReadEnable(brokerGroupConfig.slaveReadEnable);
+        brokerConfig.setLockInStrictMode(true);
+        brokerConfig.setConsumerOffsetUpdateVersionStep(10);
+        brokerConfig.setDelayOffsetUpdateVersionStep(10);
+        brokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
+
+        String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+        storeConfig.setHaListenPort(generatePort(30000, 10000));
+        storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
+        storeConfig.setMaxIndexNum(INDEX_NUM);
+        storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+        storeConfig.setTotalReplicas(brokerGroupConfig.totalReplicas);
+        storeConfig.setInSyncReplicas(brokerGroupConfig.inSyncReplicas);
+        storeConfig.setMinInSyncReplicas(brokerGroupConfig.minReplicas);
+        storeConfig.setEnableAutoInSyncReplicas(brokerGroupConfig.autoReplicas);
+        storeConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+        storeConfig.setSyncFlushTimeout(10 * 1000);
+
+        System.out.printf("start master %s with port %d-%d%n", brokerConfig.getCanonicalName(), brokerConfig.getListenPort(), storeConfig.getHaListenPort());
+        BrokerController brokerController = null;
+        try {
+            brokerController = brokerContainer.addBroker(brokerConfig, storeConfig);
+            Assert.assertNotNull(brokerController);
+            brokerController.start();
+            TMP_FILE_LIST.add(new File(brokerController.getTopicConfigManager().configFilePath()));
+            TMP_FILE_LIST.add(new File(brokerController.getSubscriptionGroupManager().configFilePath()));
+            LOG.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+        } catch (Exception e) {
+            LOG.info("Broker start failed", e);
+            e.printStackTrace();
+            System.exit(1);
+        }
+
+        return brokerController;
+    }
+
+    protected static DefaultMQProducer createProducer(String producerGroup) {
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.setInstanceName(UUID.randomUUID().toString());
+        producer.setNamesrvAddr(nsAddr);
+        return producer;
+    }
+
+    protected static TransactionMQProducer createTransactionProducer(String producerGroup,
+        TransactionCheckListener transactionCheckListener) {
+        TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
+        producer.setInstanceName(UUID.randomUUID().toString());
+        producer.setNamesrvAddr(nsAddr);
+        producer.setTransactionCheckListener(transactionCheckListener);
+        return producer;
+    }
+
+    protected static DefaultMQPullConsumer createPullConsumer(String consumerGroup) {
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setInstanceName(UUID.randomUUID().toString());
+        consumer.setNamesrvAddr(nsAddr);
+        return consumer;
+    }
+
+    protected static DefaultMQPushConsumer createPushConsumer(String consumerGroup) {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
+        consumer.setInstanceName(UUID.randomUUID().toString());
+        consumer.setNamesrvAddr(nsAddr);
+        return consumer;
+    }
+
+    protected static void createAndAddSlave(int slaveBrokerId, BrokerContainer brokerContainer,
+        BrokerController master) {
+        BrokerConfig slaveBrokerConfig = new BrokerConfig();
+        slaveBrokerConfig.setBrokerName(master.getBrokerConfig().getBrokerName());
+        slaveBrokerConfig.setBrokerId(slaveBrokerId);
+        slaveBrokerConfig.setBrokerClusterName(master.getBrokerConfig().getBrokerClusterName());
+
+        slaveBrokerConfig.setBrokerIP1("127.0.0.1");
+        slaveBrokerConfig.setBrokerIP2("127.0.0.1");
+        slaveBrokerConfig.setEnablePropertyFilter(true);
+        slaveBrokerConfig.setSlaveReadEnable(true);
+        slaveBrokerConfig.setEnableSlaveActingMaster(true);
+        slaveBrokerConfig.setEnableRemoteEscape(true);
+        slaveBrokerConfig.setLockInStrictMode(true);
+        slaveBrokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
+        slaveBrokerConfig.setConsumerOffsetUpdateVersionStep(10);
+        slaveBrokerConfig.setDelayOffsetUpdateVersionStep(10);
+
+        MessageStoreConfig storeConfig = slaveStoreConfigCache.get(slaveBrokerConfig);
+
+        if (storeConfig == null) {
+            storeConfig = new MessageStoreConfig();
+            String baseDir = createBaseDir(slaveBrokerConfig.getBrokerName() + "_" + slaveBrokerConfig.getBrokerId()).getAbsolutePath();
+            storeConfig.setStorePathRootDir(baseDir);
+            storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+            storeConfig.setHaListenPort(generatePort(master.getMessageStoreConfig().getHaListenPort(), 10000));
+            storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
+            storeConfig.setMaxIndexNum(INDEX_NUM);
+            storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+            storeConfig.setTotalReplicas(master.getMessageStoreConfig().getTotalReplicas());
+            storeConfig.setInSyncReplicas(master.getMessageStoreConfig().getInSyncReplicas());
+            storeConfig.setMinInSyncReplicas(master.getMessageStoreConfig().getMinInSyncReplicas());
+            storeConfig.setBrokerRole(BrokerRole.SLAVE);
+            slaveStoreConfigCache.put(slaveBrokerConfig, storeConfig);
+        }
+
+        System.out.printf("start slave %s with port %d-%d%n", slaveBrokerConfig.getCanonicalName(), slaveBrokerConfig.getListenPort(), storeConfig.getHaListenPort());
+
+        try {
+            BrokerController brokerController = brokerContainer.addBroker(slaveBrokerConfig, storeConfig);
+            Assert.assertNotNull(brokerContainer);
+            brokerController.start();
+            TMP_FILE_LIST.add(new File(brokerController.getTopicConfigManager().configFilePath()));
+            TMP_FILE_LIST.add(new File(brokerController.getSubscriptionGroupManager().configFilePath()));
+            LOG.info("Add slave name:{} addr:{}", slaveBrokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("Couldn't add slave broker", e);
+        }
+    }
+
+    protected static void removeSlaveBroker(int slaveBrokerId, BrokerContainer brokerContainer,
+        BrokerController master) throws Exception {
+        BrokerIdentity brokerIdentity = new BrokerIdentity(master.getBrokerConfig().getBrokerClusterName(),
+            master.getBrokerConfig().getBrokerName(), slaveBrokerId);
+
+        brokerContainer.removeBroker(brokerIdentity);
+    }
+
+    protected static void awaitUntilSlaveOK() {
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> {
+                boolean isOk = master1With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+                    && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+                for (HAConnection haConnection : master1With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+                    isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+                }
+                return isOk;
+            });
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> {
+                boolean isOk = master2With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+                    && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+                for (HAConnection haConnection : master2With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+                    isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+                }
+                return isOk;
+            });
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> {
+                boolean isOk = master3With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+                    && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+                for (HAConnection haConnection : master3With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+                    isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+                }
+                return isOk;
+            });
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected static void isolateBroker(BrokerController brokerController) {
+        final BrokerConfig config = brokerController.getBrokerConfig();
+
+        BrokerConfigLite liteConfig = new BrokerConfigLite(config.getBrokerClusterName(),
+            config.getBrokerName(),
+            brokerController.getBrokerAddr(),
+            config.getBrokerId());
+
+        // Reject register requests from the specific broker
+        isolatedBrokers.putIfAbsent(liteConfig, brokerController);
+
+        // UnRegister the specific broker immediately
+        namesrvController.getRouteInfoManager().unregisterBroker(liteConfig.getClusterName(),
+            liteConfig.getBrokerAddr(),
+            liteConfig.getBrokerName(),
+            liteConfig.getBrokerId());
+    }
+
+    protected static void cancelIsolatedBroker(BrokerController brokerController) {
+        final BrokerConfig config = brokerController.getBrokerConfig();
+
+        BrokerConfigLite liteConfig = new BrokerConfigLite(config.getBrokerClusterName(),
+            config.getBrokerName(),
+            brokerController.getBrokerAddr(),
+            config.getBrokerId());
+
+        isolatedBrokers.remove(liteConfig);
+        brokerController.registerBrokerAll(true, false, true);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> namesrvController.getRouteInfoManager()
+            .getBrokerMemberGroup(liteConfig.getClusterName(), liteConfig.brokerName).getBrokerAddrs()
+            .containsKey(liteConfig.getBrokerId()));
+    }
+
+    protected static InnerSalveBrokerController getSlaveFromContainerByName(BrokerContainer brokerContainer,
+        String brokerName) {
+        InnerSalveBrokerController targetSlave = null;
+        for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
+            if (slave.getBrokerConfig().getBrokerName().equals(brokerName)) {
+                targetSlave = slave;
+            }
+        }
+
+        return targetSlave;
+    }
+
+    protected static void changeCompatibleMode(boolean compatibleMode) {
+        brokerContainer1.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+        brokerContainer2.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+        brokerContainer3.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+    }
+
+    protected static Set<MessageQueue> filterMessageQueue(Set<MessageQueue> mqSet, String topic) {
+        Set<MessageQueue> targetMqSet = new HashSet<>();
+        if (topic != null) {
+            for (MessageQueue mq : mqSet) {
+                if (mq.getTopic().equals(topic)) {
+                    targetMqSet.add(mq);
+                }
+            }
+        }
+
+        return targetMqSet;
+    }
+
+    public static class BrokerGroupConfig {
+        int totalReplicas = 3;
+        int minReplicas = 1;
+        int inSyncReplicas = 2;
+        boolean autoReplicas = true;
+        boolean enableSlaveActingMaster = true;
+        boolean enableRemoteEscape = true;
+        boolean slaveReadEnable = true;
+
+        public BrokerGroupConfig() {
+        }
+
+        public BrokerGroupConfig(final int totalReplicas, final int minReplicas, final int inSyncReplicas,
+            final boolean autoReplicas, boolean enableSlaveActingMaster, boolean slaveReadEnable) {
+            this.totalReplicas = totalReplicas;
+            this.minReplicas = minReplicas;
+            this.inSyncReplicas = inSyncReplicas;
+            this.autoReplicas = autoReplicas;
+            this.enableSlaveActingMaster = enableSlaveActingMaster;
+            this.slaveReadEnable = slaveReadEnable;
+        }
+    }
+
+    static class BrokerConfigLite {
+        private String clusterName;
+        private String brokerName;
+        private String brokerAddr;
+        private long brokerId;
+
+        public BrokerConfigLite(final String clusterName, final String brokerName, final String brokerAddr,
+            final long brokerId) {
+            this.clusterName = clusterName;
+            this.brokerName = brokerName;
+            this.brokerAddr = brokerAddr;
+            this.brokerId = brokerId;
+        }
+
+        public String getClusterName() {
+            return clusterName;
+        }
+
+        public String getBrokerName() {
+            return brokerName;
+        }
+
+        public String getBrokerAddr() {
+            return brokerAddr;
+        }
+
+        public long getBrokerId() {
+            return brokerId;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            final BrokerConfigLite lite = (BrokerConfigLite) o;
+
+            return new EqualsBuilder()
+                .append(clusterName, lite.clusterName)
+                .append(brokerName, lite.brokerName)
+                .append(brokerAddr, lite.brokerAddr)
+                .append(brokerId, lite.brokerId)
+                .isEquals();
+        }
+
+        @Override
+        public int hashCode() {
+            return new HashCodeBuilder(17, 37)
+                .append(clusterName)
+                .append(brokerName)
+                .append(brokerAddr)
+                .append(brokerId)
+                .toHashCode();
+        }
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java b/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java
new file mode 100644
index 0000000..0b55d77
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class GetMaxOffsetFromSlaveIT extends ContainerIntegrationTestBase {
+    private static DefaultMQProducer mqProducer;
+
+    private final byte[] MESSAGE_BODY = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+    public GetMaxOffsetFromSlaveIT() throws UnsupportedEncodingException {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws MQClientException {
+        mqProducer = createProducer(GetMaxOffsetFromSlaveIT.class.getSimpleName() + "_Producer");
+        mqProducer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        if (mqProducer != null) {
+            mqProducer.shutdown();
+        }
+    }
+
+    @Test
+    public void testGetMaxOffsetFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        awaitUntilSlaveOK();
+        mqProducer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+        for (int i = 0; i < 100; i++) {
+            Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+            SendResult sendResult = mqProducer.send(msg, 10000);
+            assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        }
+
+        Map<Integer, Long> maxOffsetMap = new HashMap<>();
+        TopicPublishInfo publishInfo = mqProducer.getDefaultMQProducerImpl().getTopicPublishInfoTable().get(THREE_REPLICAS_TOPIC);
+        assertThat(publishInfo).isNotNull();
+        for (MessageQueue mq : publishInfo.getMessageQueueList()) {
+            maxOffsetMap.put(mq.getQueueId(), mqProducer.getDefaultMQProducerImpl().
+                maxOffset(new MessageQueue(THREE_REPLICAS_TOPIC, master3With3Replicas.getBrokerConfig().getBrokerName(), mq.getQueueId())));
+        }
+
+        isolateBroker(master3With3Replicas);
+
+        mqProducer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+        assertThat(mqProducer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(
+            master3With3Replicas.getBrokerConfig().getBrokerName())).isNotNull();
+
+        for (MessageQueue mq : publishInfo.getMessageQueueList()) {
+            assertThat(mqProducer.getDefaultMQProducerImpl().maxOffset(
+                new MessageQueue(THREE_REPLICAS_TOPIC, master3With3Replicas.getBrokerConfig().getBrokerName(), mq.getQueueId())))
+                .isEqualTo(maxOffsetMap.get(mq.getQueueId()));
+        }
+
+        cancelIsolatedBroker(master3With3Replicas);
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
new file mode 100644
index 0000000..7138f42
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+@Ignore
+public class GetMetadataReverseIT extends ContainerIntegrationTestBase {
+
+    private static DefaultMQProducer producer;
+
+    private static final String CONSUMER_GROUP = GetMetadataReverseIT.class.getSimpleName() + "_Consumer";
+
+    private static final int MESSAGE_COUNT = 32;
+
+    private final static Random random = new Random();
+
+    public GetMetadataReverseIT() throws UnsupportedEncodingException {
+
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        producer = createProducer(PushMultipleReplicasIT.class.getSimpleName() + "_PRODUCER");
+        producer.setSendMsgTimeout(15 * 1000);
+        producer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testGetMetadataReverse_consumerOffset() throws Exception {
+        String topic = GetMetadataReverseIT.class.getSimpleName() + "_consumerOffset" + random.nextInt(65535);
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        // Wait topic synchronization
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+            return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+        });
+
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, Integer.toString(i).getBytes());
+            SendResult sendResult = producer.send(msg);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+        System.out.printf("send success%n");
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+        System.out.printf("Remove master%n");
+
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+        await().atMost(Duration.ofMinutes(3)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+            Map<Integer, Long> slaveOffsetTable = null;
+            for (InnerSalveBrokerController slave : brokerContainer2.getSlaveBrokers()) {
+                if (slave.getBrokerConfig().getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+                    slaveOffsetTable = slave.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+                }
+            }
+
+            if (slaveOffsetTable != null) {
+                long totalOffset = 0;
+                for (final Long offset : slaveOffsetTable.values()) {
+                    totalOffset += offset;
+                }
+
+                return totalOffset >= MESSAGE_COUNT;
+            }
+            return false;
+        });
+
+        //Add back master
+        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master%n");
+
+        awaitUntilSlaveOK();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            Map<Integer, Long> offsetTable = master1With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+            long totalOffset = 0;
+            if (offsetTable != null) {
+                for (final Long offset : offsetTable.values()) {
+                    totalOffset += offset;
+                }
+            }
+            return totalOffset >= MESSAGE_COUNT;
+        });
+
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void testGetMetadataReverse_delayOffset() throws Exception {
+        String topic = GetMetadataReverseIT.class.getSimpleName() + "_delayOffset" + random.nextInt(65535);
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        createTopicTo(master2With3Replicas, topic, 1, 1);
+        createTopicTo(master3With3Replicas, topic, 1, 1);
+        // Wait topic synchronization
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+            return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+        });
+        int delayLevel = 4;
+
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, Integer.toString(i).getBytes());
+            msg.setDelayTimeLevel(delayLevel);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+        System.out.printf("send success%n");
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+        System.out.printf("Remove master%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+            Map<Integer, Long> OffsetTable = master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+            if (OffsetTable != null) {
+                long totalOffset = 0;
+                for (final Long offset : OffsetTable.values()) {
+                    totalOffset += offset;
+                }
+                return totalOffset >= MESSAGE_COUNT;
+
+            }
+            return false;
+        });
+
+        //Add back master
+        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master%n");
+
+        awaitUntilSlaveOK();
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            Map<Integer, Long> offsetTable = master1With3Replicas.getScheduleMessageService().getOffsetTable();
+            System.out.println("" + offsetTable.get(delayLevel));
+            return offsetTable.get(delayLevel) >= MESSAGE_COUNT;
+        });
+
+        pushConsumer.shutdown();
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java
new file mode 100644
index 0000000..0181645
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class PullMultipleReplicasIT extends ContainerIntegrationTestBase {
+    private static DefaultMQPullConsumer pullConsumer;
+    private static DefaultMQProducer producer;
+    private static MQClientInstance mqClientInstance;
+
+    private final String MESSAGE_STRING = RandomStringUtils.random(1024);
+    private final byte[] MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+    public PullMultipleReplicasIT() throws UnsupportedEncodingException {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+
+        pullConsumer = createPullConsumer(PullMultipleReplicasIT.class.getSimpleName() + "_Consumer");
+        pullConsumer.start();
+
+        Field field = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        mqClientInstance = (MQClientInstance) field.get(pullConsumer.getDefaultMQPullConsumerImpl());
+
+        producer = createProducer(PullMultipleReplicasIT.class.getSimpleName() + "_Producer");
+        producer.setSendMsgTimeout(15 * 1000);
+        producer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        producer.shutdown();
+        pullConsumer.shutdown();
+    }
+
+    @Test
+    public void testPullMessageFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
+        awaitUntilSlaveOK();
+
+        Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+        SendResult sendResult = producer.send(msg);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+        final MessageQueue messageQueue = sendResult.getMessageQueue();
+        final long queueOffset = sendResult.getQueueOffset();
+
+        final PullResult[] pullResult = {null};
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+            return pullResult[0].getPullStatus() == PullStatus.FOUND;
+        });
+
+        List<MessageExt> msgFoundList = pullResult[0].getMsgFoundList();
+        assertThat(msgFoundList.size()).isEqualTo(1);
+        assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+        // Pull the same message from the slave broker
+        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 1);
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+            return pullResult[0].getPullStatus() == PullStatus.FOUND;
+        });
+
+        msgFoundList = pullResult[0].getMsgFoundList();
+        assertThat(msgFoundList.size()).isEqualTo(1);
+        assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+        // Pull the same message from the slave broker
+        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 2);
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+            return pullResult[0].getPullStatus() == PullStatus.FOUND;
+        });
+
+        msgFoundList = pullResult[0].getMsgFoundList();
+        assertThat(msgFoundList.size()).isEqualTo(1);
+        assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+        pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 0);
+    }
+
+    @Test
+    public void testSendMessageBackToSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
+        awaitUntilSlaveOK();
+
+        String clusterTopic = "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK";
+        createTopicTo(master1With3Replicas, clusterTopic);
+        createTopicTo(master3With3Replicas, clusterTopic);
+
+        Message msg = new Message(clusterTopic, MESSAGE_BODY);
+        producer.setSendMsgTimeout(10 * 1000);
+
+        final MessageQueue[] selectedQueue = new MessageQueue[1];
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            for (final MessageQueue queue : producer.fetchPublishMessageQueues(clusterTopic)) {
+                if (queue.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+                    selectedQueue[0] = queue;
+                }
+            }
+            return selectedQueue[0] != null;
+        });
+
+        SendResult sendResult = producer.send(msg, selectedQueue[0]);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+        final MessageQueue messageQueue = sendResult.getMessageQueue();
+        final long queueOffset = sendResult.getQueueOffset();
+
+        final PullResult[] pullResult = {null};
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+            return pullResult[0].getPullStatus() == PullStatus.FOUND;
+        });
+
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            DefaultMessageStore messageStore = (DefaultMessageStore) master3With3Replicas.getMessageStore();
+            return messageStore.getHaService().inSyncSlaveNums(messageStore.getMaxPhyOffset()) == 2;
+        });
+
+        InnerSalveBrokerController slaveBroker = null;
+        for (InnerSalveBrokerController slave : brokerContainer1.getSlaveBrokers()) {
+            if (slave.getBrokerConfig().getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+                slaveBroker = slave;
+            }
+        }
+
+        assertThat(slaveBroker).isNotNull();
+
+        MessageExt backMessage = pullResult[0].getMsgFoundList().get(0);
+
+        // Message will be sent to the master broker(master1With3Replicas) beside a slave broker of master3With3Replicas
+        backMessage.setStoreHost(new InetSocketAddress(slaveBroker.getBrokerConfig().getBrokerIP1(), slaveBroker.getBrokerConfig().getListenPort()));
+        pullConsumer.sendMessageBack(backMessage, 0);
+
+        String retryTopic = MixAll.getRetryTopic(pullConsumer.getConsumerGroup());
+        // Retry topic only has one queue by default
+        MessageQueue newMsgQueue = new MessageQueue(retryTopic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        await().atMost(Duration.ofSeconds(60)).until(() -> {
+            pullResult[0] = pullConsumer.pull(newMsgQueue, "*", 0, 1);
+            return pullResult[0].getPullStatus() == PullStatus.FOUND;
+        });
+
+        List<MessageExt> msgFoundList = pullResult[0].getMsgFoundList();
+        assertThat(msgFoundList.size()).isEqualTo(1);
+        assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+        awaitUntilSlaveOK();
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java
new file mode 100644
index 0000000..abe1d5e
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class PushMultipleReplicasIT extends ContainerIntegrationTestBase {
+    private static DefaultMQProducer producer;
+
+    private static final String TOPIC = PushMultipleReplicasIT.class.getSimpleName() + "_TOPIC";
+    private static final String REDIRECT_TOPIC = PushMultipleReplicasIT.class.getSimpleName() + "_REDIRECT_TOPIC";
+    private static final String CONSUMER_GROUP = PushMultipleReplicasIT.class.getSimpleName() + "_Consumer";
+    private static final int MESSAGE_COUNT = 32;
+
+    public PushMultipleReplicasIT() throws UnsupportedEncodingException {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        createTopicTo(master1With3Replicas, TOPIC,1, 1);
+        producer = createProducer(PushMultipleReplicasIT.class.getSimpleName() + "_PRODUCER");
+        producer.setSendMsgTimeout(15 * 1000);
+        producer.start();
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(new Message(TOPIC, Integer.toString(i).getBytes()));
+        }
+
+        createTopicTo(master3With3Replicas, REDIRECT_TOPIC, 1, 1);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producer.shutdown();
+    }
+
+    @Test
+    public void consumeMessageFromSlave_PushConsumer() throws MQClientException {
+        // Wait topic synchronization
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+           return slaveBroker.getTopicConfigManager().selectTopicConfig(TOPIC) != null;
+        });
+        isolateBroker(master1With3Replicas);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+        pushConsumer.subscribe(TOPIC, "*");
+        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+        await().atMost(Duration.ofMinutes(5)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+            Map<Integer, Long> slaveOffsetTable = null;
+            for (InnerSalveBrokerController slave : brokerContainer2.getSlaveBrokers()) {
+                if (slave.getBrokerConfig().getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+                    slaveOffsetTable = slave.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, TOPIC);
+                }
+            }
+
+            if (slaveOffsetTable != null) {
+                long totalOffset = 0;
+                for (final Long offset : slaveOffsetTable.values()) {
+                    totalOffset += offset;
+                }
+
+                return totalOffset >= MESSAGE_COUNT;
+            }
+            return false;
+        });
+
+        pushConsumer.shutdown();
+        cancelIsolatedBroker(master1With3Replicas);
+
+        awaitUntilSlaveOK();
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java b/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java
new file mode 100644
index 0000000..41345b1
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Test lock on slave when acting master enabled
+ */
+public class RebalanceLockOnSlaveIT extends ContainerIntegrationTestBase {
+    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
+
+    private static DefaultMQProducer mqProducer;
+    private static DefaultMQPushConsumer mqConsumerThreeReplica1;
+    private static DefaultMQPushConsumer mqConsumerThreeReplica2;
+    private static DefaultMQPushConsumer mqConsumerThreeReplica3;
+
+    public RebalanceLockOnSlaveIT() {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
+        mqProducer.start();
+
+        mqConsumerThreeReplica1 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+        mqConsumerThreeReplica1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        mqConsumerThreeReplica1.subscribe(THREE_REPLICAS_TOPIC, "*");
+
+        mqConsumerThreeReplica2 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+        mqConsumerThreeReplica2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        mqConsumerThreeReplica2.subscribe(THREE_REPLICAS_TOPIC, "*");
+
+        mqConsumerThreeReplica3 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+        mqConsumerThreeReplica3.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        mqConsumerThreeReplica3.subscribe(THREE_REPLICAS_TOPIC, "*");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        if (mqProducer != null) {
+            mqProducer.shutdown();
+        }
+    }
+
+    @Test
+    public void lockFromSlave() throws Exception {
+        awaitUntilSlaveOK();
+
+        mqConsumerThreeReplica3.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+        mqConsumerThreeReplica3.start();
+
+        final Set<MessageQueue> mqSet = mqConsumerThreeReplica3.fetchSubscribeMessageQueues(THREE_REPLICAS_TOPIC);
+
+        assertThat(targetTopicMqCount(mqSet, THREE_REPLICAS_TOPIC)).isEqualTo(24);
+
+        for (MessageQueue mq : mqSet) {
+            await().atMost(Duration.ofSeconds(60)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+        }
+
+        isolateBroker(master3With3Replicas);
+
+        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+        FindBrokerResult result = mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+            master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true);
+        assertThat(result).isNotNull();
+
+        for (MessageQueue mq : mqSet) {
+            if (mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+                await().atMost(Duration.ofSeconds(60)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+            }
+        }
+
+        removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+        assertThat(brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
+
+        mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+        for (MessageQueue mq : mqSet) {
+            if (mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+                await().atMost(Duration.ofSeconds(60)).until(() -> !mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+            }
+        }
+
+        cancelIsolatedBroker(master3With3Replicas);
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+        awaitUntilSlaveOK();
+
+        mqConsumerThreeReplica3.shutdown();
+        await().atMost(100, TimeUnit.SECONDS).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
+    }
+
+    @Ignore
+    @Test
+    public void multiConsumerLockFromSlave() throws MQClientException, InterruptedException {
+        awaitUntilSlaveOK();
+
+        mqConsumerThreeReplica1.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+        mqConsumerThreeReplica1.start();
+
+        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
+        Set<MessageQueue> mqSet1 = filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+        assertThat(mqSet1.size()).isEqualTo(24);
+
+        isolateBroker(master3With3Replicas);
+
+        System.out.printf("%s isolated%n", master3With3Replicas.getBrokerConfig().getCanonicalName());
+
+        Thread.sleep(5000);
+
+        mqConsumerThreeReplica2.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+        mqConsumerThreeReplica2.start();
+
+        Thread.sleep(5000);
+
+        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+        assertThat(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+            master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true)).isNotNull();
+
+        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+            master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true);
+        assertThat(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+            master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true)).isNotNull();
+
+        mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
+        mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().doRebalance();
+
+        Set<MessageQueue> mqSet2 = filterMessageQueue(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+        mqSet1 = filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+        List<MessageQueue> mqList = new ArrayList<>();
+
+        for (MessageQueue mq : mqSet2) {
+            if (mq.getTopic().equals(THREE_REPLICAS_TOPIC)) {
+                mqList.add(mq);
+            }
+        }
+
+        for (MessageQueue mq : mqSet1) {
+            if (mq.getTopic().equals(THREE_REPLICAS_TOPIC)) {
+                mqList.add(mq);
+            }
+        }
+
+        await().atMost(Duration.ofSeconds(30)).until(() -> mqList.size() == 24);
+
+        cancelIsolatedBroker(master3With3Replicas);
+        awaitUntilSlaveOK();
+
+        mqConsumerThreeReplica1.shutdown();
+        mqConsumerThreeReplica2.shutdown();
+
+        await().atMost(100, TimeUnit.SECONDS).until(() ->
+            mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY &&
+                mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY
+        );
+    }
+
+    private static int targetTopicMqCount(Set<MessageQueue> mqSet, String topic) {
+        int count = 0;
+        for (MessageQueue mq : mqSet) {
+            if (mq.getTopic().equals(topic)) {
+                count++;
+            }
+        }
+        return count;
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
new file mode 100644
index 0000000..c2850f3
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+//The test is correct, but it takes too much time, so it is ignored for the time being
+@Ignore
+public class ScheduleSlaveActingMasterIT extends ContainerIntegrationTestBase {
+
+    private static final String CONSUME_GROUP = ScheduleSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
+    private final static int MESSAGE_COUNT = 32;
+    private final static Random random = new Random();
+    private static DefaultMQProducer producer;
+    private final static String MESSAGE_STRING = RandomStringUtils.random(1024);
+    private static byte[] MESSAGE_BODY;
+
+    static {
+        try {
+            MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+        }
+    }
+
+    void createTopic(String topic) {
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        createTopicTo(master2With3Replicas, topic, 1, 1);
+        createTopicTo(master3With3Replicas, topic, 1, 1);
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        producer = createProducer(ScheduleSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER");
+        producer.setSendMsgTimeout(5000);
+        producer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producer.shutdown();
+    }
+
+    @Test
+    public void testLocalActing_delayMsg() throws Exception {
+        awaitUntilSlaveOK();
+        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
+        createTopic(topic);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            long period = System.currentTimeMillis() - msgs.get(0).getBornTimestamp();
+            if (Math.abs(period - 30000) <= 4000) {
+                inTimeMsgCount.addAndGet(msgs.size());
+            }
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            msg.setDelayTimeLevel(4);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+        System.out.printf("send success%n");
+
+        isolateBroker(master1With3Replicas);
+        brokerContainer1.removeBroker(new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+        System.out.printf("Remove master1%n");
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+        pushConsumer.shutdown();
+
+        //Add back master
+        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master1%n");
+
+        awaitUntilSlaveOK();
+        // sleep a while to recover
+        Thread.sleep(30000);
+    }
+
+    @Test
+    public void testRemoteActing_delayMsg() throws Exception {
+        awaitUntilSlaveOK();
+
+        String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
+        createTopic(topic);
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+        AtomicInteger master3MsgCount = new AtomicInteger(0);
+
+        MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+        int sendSuccess = 0;
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            msg.setDelayTimeLevel(4);
+            SendResult sendResult = producer.send(msg, messageQueue);
+            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                sendSuccess++;
+            }
+        }
+        final int finalSendSuccess = sendSuccess;
+        await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+        long sendCompleteTimeStamp = System.currentTimeMillis();
+        System.out.printf("send success%n");
+
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+        pushConsumer.subscribe(topic, "*");
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            long period = System.currentTimeMillis() - sendCompleteTimeStamp;
+            // Remote Acting lead to born timestamp, msgId changed, it need to polish.
+            if (Math.abs(period - 30000) <= 4000) {
+                inTimeMsgCount.addAndGet(msgs.size());
+            }
+            if (msgs.get(0).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+                master3MsgCount.addAndGet(msgs.size());
+            }
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf("cost " + period + " " + x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        isolateBroker(master1With3Replicas);
+        BrokerIdentity master1BrokerIdentity = new BrokerIdentity(
+            master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master1With3Replicas.getBrokerConfig().getBrokerName(),
+            master1With3Replicas.getBrokerConfig().getBrokerId());
+
+        brokerContainer1.removeBroker(master1BrokerIdentity);
+        System.out.printf("Remove master1%n");
+
+        isolateBroker(master2With3Replicas);
+        BrokerIdentity master2BrokerIdentity = new BrokerIdentity(
+            master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+            master2With3Replicas.getBrokerConfig().getBrokerName(),
+            master2With3Replicas.getBrokerConfig().getBrokerId());
+        brokerContainer2.removeBroker(master2BrokerIdentity);
+        System.out.printf("Remove master2%n");
+
+        await().atMost(Duration.ofMinutes(2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && master3MsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+        pushConsumer.shutdown();
+
+        //Add back master
+        master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+        master1With3Replicas.start();
+        cancelIsolatedBroker(master1With3Replicas);
+        System.out.printf("Add back master1%n");
+
+        //Add back master
+        master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
+        master2With3Replicas.start();
+        cancelIsolatedBroker(master2With3Replicas);
+        System.out.printf("Add back master2%n");
+
+        awaitUntilSlaveOK();
+        // sleep a while to recover
+        Thread.sleep(30000);
+    }
+
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
new file mode 100644
index 0000000..6e5c9a9
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class ScheduledMessageIT extends ContainerIntegrationTestBase {
+    private static DefaultMQProducer producer;
+
+    private static final String CONSUME_GROUP = ScheduledMessageIT.class.getSimpleName() + "_Consumer";
+    private static final String MESSAGE_STRING = RandomStringUtils.random(1024);
+    private static byte[] MESSAGE_BODY;
+
+    static {
+        try {
+            MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+        } catch (UnsupportedEncodingException ignored) {
+        }
+    }
+
+    private static final String TOPIC_PREFIX = ScheduledMessageIT.class.getSimpleName() + "_TOPIC";
+    private static Random random = new Random();
+    private static final int MESSAGE_COUNT = 128;
+
+    public ScheduledMessageIT() throws UnsupportedEncodingException {
+    }
+
+    void createTopic(String topic) {
+        createTopicTo(master1With3Replicas, topic, 1, 1);
+        createTopicTo(master2With3Replicas, topic, 1, 1);
+        createTopicTo(master3With3Replicas, topic, 1, 1);
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        producer = createProducer(ScheduledMessageIT.class.getSimpleName() + "_PRODUCER");
+        producer.setSendMsgTimeout(5000);
+        producer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producer.shutdown();
+    }
+
+    @Ignore
+    @Test
+    public void consumeScheduledMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        String topic = TOPIC_PREFIX + random.nextInt(65535);
+        createTopic(topic);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            long period = System.currentTimeMillis() - msgs.get(0).getBornTimestamp();
+            if (Math.abs(period - 5000) <= 1000) {
+                inTimeMsgCount.addAndGet(msgs.size());
+            }
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(receivedMsgCount.get()+" cost " + period + " " + x + "%n"));
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        pushConsumer.start();
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            msg.setDelayTimeLevel(2);
+            producer.send(msg);
+        }
+
+        await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+        System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+        pushConsumer.shutdown();
+    }
+
+    @Test
+    public void consumeScheduledMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        String topic = TOPIC_PREFIX + random.nextInt(65535);
+        createTopic(topic);
+        DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
+        pushConsumer.subscribe(topic, "*");
+        AtomicInteger receivedMsgCount = new AtomicInteger(0);
+        pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            receivedMsgCount.addAndGet(msgs.size());
+            msgs.forEach(x -> System.out.printf(x + "%n"));
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message msg = new Message(topic, String.valueOf(i).getBytes());
+            msg.setDelayTimeLevel(2);
+            producer.send(msg);
+        }
+
+        isolateBroker(master1With3Replicas);
+
+        producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+        assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
+
+        pushConsumer.start();
+
+        await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+        pushConsumer.shutdown();
+        cancelIsolatedBroker(master1With3Replicas);
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+    }
+
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java
new file mode 100644
index 0000000..ea35781
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SendMultipleReplicasIT extends ContainerIntegrationTestBase {
+    private static DefaultMQProducer mqProducer;
+
+    private final byte[] MESSAGE_BODY = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+    public SendMultipleReplicasIT() throws UnsupportedEncodingException {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        mqProducer = createProducer("SendMultipleReplicasMessageIT_Producer");
+        mqProducer.setSendMsgTimeout(15 * 1000);
+        mqProducer.start();
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        if (mqProducer != null) {
+            mqProducer.shutdown();
+        }
+    }
+
+    @Test
+    public void sendMessageToBrokerGroup() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        awaitUntilSlaveOK();
+
+        // Send message to broker group with three replicas
+        Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+        SendResult sendResult = mqProducer.send(msg);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+    }
+
+    @Test
+    public void sendMessage_Auto_Replicas_Success() throws Exception {
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+                && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+        // Broker with 3 replicas configured as 3-2-1 auto replicas mode
+        Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+        SendResult sendResult = mqProducer.send(msg);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+        // Remove two slave broker
+        removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+        removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() ->
+                ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0
+                    && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
+
+        master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(true);
+        List<MessageQueue> mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues(THREE_REPLICAS_TOPIC);
+        MessageQueue targetMq = null;
+        for (MessageQueue mq : mqList) {
+            if (mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+                targetMq = mq;
+            }
+        }
+
+        assertThat(targetMq).isNotNull();
+        // Although this broker group only has one slave broker, send will be success in auto mode.
+        msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+        sendResult = mqProducer.send(msg, targetMq);
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+        // Recover the cluster state
+        createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+        createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+                && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+    }
+
+    @Test
+    public void sendMessage_Auto_Replicas_Failed()
+        throws Exception {
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+                && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+        // Broker with 3 replicas configured as 3-2-1 auto replicas mode
+        // Remove two slave broker
+        removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+        removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() ->
+                ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0
+                    && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
+
+        // Disable the auto mode
+        master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(false);
+
+        List<MessageQueue> mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues(THREE_REPLICAS_TOPIC);
+        MessageQueue targetMq = null;
+        for (MessageQueue mq : mqList) {
+            if (mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+                targetMq = mq;
+            }
+        }
+
+        assertThat(targetMq).isNotNull();
+
+        Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+        boolean exceptionCaught = false;
+        try {
+            mqProducer.send(msg, targetMq);
+        } catch (MQBrokerException e) {
+            exceptionCaught = true;
+        }
+
+        assertThat(exceptionCaught).isTrue();
+        // Recover the cluster state
+        createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+        createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+                && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java
new file mode 100644
index 0000000..3e6cd8d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SlaveBrokerIT extends ContainerIntegrationTestBase {
+    @Test
+    public void reAddSlaveBroker() throws Exception {
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+            if (clusterInfo.getClusterAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerClusterName()).size() != 3) {
+                return false;
+            }
+
+            if (clusterInfo.getBrokerAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+                return false;
+            }
+
+            if (clusterInfo.getBrokerAddrTable().get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+                return false;
+            }
+
+            if (clusterInfo.getBrokerAddrTable().get(master3With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+                return false;
+            }
+
+            return true;
+        });
+
+        // Remove one replicas from each broker group
+        removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+        removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+        removeSlaveBroker(1, brokerContainer3, master2With3Replicas);
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            // Test cluster info again
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            assertThat(clusterInfo.getBrokerAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+                .isEqualTo(2);
+
+            assertThat(clusterInfo.getBrokerAddrTable().get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+                .isEqualTo(2);
+
+            assertThat(clusterInfo.getBrokerAddrTable().get(master3With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+                .isEqualTo(2);
+            return true;
+        });
+
+        // ReAdd the slave broker
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+        createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+        createAndAddSlave(1, brokerContainer3, master2With3Replicas);
+
+        // Trigger a register action
+        //for (final SlaveBrokerController slaveBrokerController : brokerContainer1.getSlaveBrokers()) {
+        //    slaveBrokerController.registerBrokerAll(false, false, true);
+        //}
+        //
+        //for (final SlaveBrokerController slaveBrokerController : brokerContainer2.getSlaveBrokers()) {
+        //    slaveBrokerController.registerBrokerAll(false, false, true);
+        //}
+
+        await().atMost(Duration.ofMinutes(1)).until(() -> {
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+            return clusterInfo.getBrokerAddrTable()
+                .get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3
+                && clusterInfo.getBrokerAddrTable()
+                .get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3
+                && clusterInfo.getBrokerAddrTable()
+                .get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3;
+        });
+    }
+
+    @Test
+    public void reAddSlaveBroker_ConnectionCheck() throws Exception {
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+
+        removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+        createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+
+        await().atMost(100, TimeUnit.SECONDS)
+            .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().inSyncSlaveNums(0) == 2);
+
+        Thread.sleep(1000 * 101);
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java
new file mode 100644
index 0000000..7b243f5
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SyncConsumerOffsetIT extends ContainerIntegrationTestBase {
+    private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
+    private static final String TEST_SYNC_TOPIC = SyncConsumerOffsetIT.class.getSimpleName() + "_topic";
+
+    private static DefaultMQProducer mqProducer;
+    private static DefaultMQPushConsumer mqConsumerThreeReplica;
+
+    private final byte[] MESSAGE_BODY = ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+    public SyncConsumerOffsetIT() throws UnsupportedEncodingException {
+    }
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        createTopicTo(master3With3Replicas, TEST_SYNC_TOPIC);
+
+        mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
+        mqProducer.setSendMsgTimeout(15 * 1000);
+        mqProducer.start();
+
+        mqConsumerThreeReplica = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+        mqConsumerThreeReplica.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        mqConsumerThreeReplica.subscribe(TEST_SYNC_TOPIC, "*");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        mqProducer.shutdown();
+        mqConsumerThreeReplica.shutdown();
+    }
+
+    @Test
+    public void syncConsumerOffsetWith3Replicas() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        syncConsumeOffsetInner(TEST_SYNC_TOPIC, mqConsumerThreeReplica,
+            master3With3Replicas, Arrays.asList(brokerContainer1, brokerContainer2));
+    }
+
+    private void syncConsumeOffsetInner(String topic, DefaultMQPushConsumer consumer, BrokerController master,
+        List<BrokerContainer> slaveContainers) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        awaitUntilSlaveOK();
+        String group = THREE_REPLICA_CONSUMER_GROUP;
+
+        int msgCount = 100;
+        for (int i = 0; i < msgCount; i++) {
+            Message msg = new Message(topic, MESSAGE_BODY);
+            SendResult sendResult = mqProducer.send(msg);
+            assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        }
+
+        CountDownLatch countDownLatch = new CountDownLatch(msgCount);
+        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+            countDownLatch.countDown();
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        });
+        consumer.start();
+        boolean ok = countDownLatch.await(100, TimeUnit.SECONDS);
+        assertThat(ok).isEqualTo(true);
+        System.out.printf("consume complete%n");
+
+        final Set<MessageQueue> mqSet = filterMessageQueue(consumer.fetchSubscribeMessageQueues(topic), topic);
+
+        await().atMost(120, TimeUnit.SECONDS).until(() -> {
+            Map<Integer, Long> consumerOffsetMap = new HashMap<>();
+            long offsetTotal = 0L;
+            for (MessageQueue mq : mqSet) {
+                long queueOffset = master.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
+                if (queueOffset < 0) {
+                    continue;
+                }
+                offsetTotal += queueOffset;
+                consumerOffsetMap.put(mq.getQueueId(), queueOffset);
+            }
+
+            if (offsetTotal < 100) {
+                return false;
+            }
+            boolean syncOk = true;
+
+            for (BrokerContainer brokerContainer : slaveContainers) {
+                for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
+                    if (!slave.getBrokerConfig().getBrokerName().equals(master.getBrokerConfig().getBrokerName())) {
+                        continue;
+                    }
+                    for (MessageQueue mq : mqSet) {
+                        long slaveOffset = slave.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
+                        boolean check = slaveOffset == consumerOffsetMap.get(mq.getQueueId());
+                        syncOk &= check;
+                    }
+                }
+            }
+
+            return syncOk;
+        });
+    }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index b3d2d47..a6e3e75 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -19,11 +19,7 @@ package org.apache.rocketmq.test.statictopic;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.log4j.Logger;
-import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -36,22 +32,20 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
 import org.apache.rocketmq.test.util.MQAdminTestUtils;
 import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.test.util.TestUtils;
 import org.apache.rocketmq.test.util.VerifyUtils;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminUtils;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.FixMethodOrder;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -210,8 +204,8 @@ public class StaticTopicIT extends BaseConf {
 
     private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
         consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
-//        System.out.println("produce:" + producer.getAllMsgBody().size());
-//        System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());
+        /*System.out.println("produce:" + producer.getAllMsgBody().size());
+        System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
 
         assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                 consumer.getListener().getAllMsgBody()))
@@ -284,7 +278,6 @@ public class StaticTopicIT extends BaseConf {
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
             consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
         }
-
         //remapping the static topic
         {
             Set<String> targetBrokers = ImmutableSet.of(broker2Name);
@@ -344,6 +337,8 @@ public class StaticTopicIT extends BaseConf {
             Thread.sleep(500);
             sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
         }
+
+        TestUtils.waitForSeconds(20);
         consumeStats = defaultMQAdminExt.examineConsumeStats(group);
 
         messageQueues = producer.getMessageQueue();