You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/14 06:52:27 UTC
[rocketmq] 06/07: feature(test):[RIP-31]Add ITs for BrokerContainer (#3980)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 00d2d1c40c622d267b20af1c73bfd37ff74f62fd
Author: rongtong <ji...@163.com>
AuthorDate: Mon Mar 14 14:07:18 2022 +0800
feature(test):[RIP-31]Add ITs for BrokerContainer (#3980)
---
test/pom.xml | 4 +
.../rocketmq/test/util/MQAdminTestUtils.java | 2 +-
.../org/apache/rocketmq/test/base/BaseConf.java | 26 +-
.../test/client/producer/batch/BatchSendIT.java | 2 +-
.../client/producer/order/OrderMsgRebalanceIT.java | 2 +-
.../test/container/AddAndRemoveBrokerIT.java | 81 +++
.../rocketmq/test/container/BrokerFailoverIT.java | 89 +++
.../test/container/BrokerMemberGroupIT.java | 69 +++
.../container/ContainerIntegrationTestBase.java | 666 +++++++++++++++++++++
.../test/container/GetMaxOffsetFromSlaveIT.java | 98 +++
.../test/container/GetMetadataReverseIT.java | 231 +++++++
.../test/container/PullMultipleReplicasIT.java | 199 ++++++
.../test/container/PushMultipleReplicasIT.java | 112 ++++
.../test/container/RebalanceLockOnSlaveIT.java | 208 +++++++
.../container/ScheduleSlaveActingMasterIT.java | 220 +++++++
.../test/container/ScheduledMessageIT.java | 152 +++++
.../test/container/SendMultipleReplicasIT.java | 157 +++++
.../rocketmq/test/container/SlaveBrokerIT.java | 115 ++++
.../test/container/SyncConsumerOffsetIT.java | 146 +++++
.../rocketmq/test/statictopic/StaticTopicIT.java | 15 +-
20 files changed, 2568 insertions(+), 26 deletions(-)
diff --git a/test/pom.xml b/test/pom.xml
index 2bdab7c..3cf4f2d 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -41,6 +41,10 @@
<artifactId>rocketmq-namesrv</artifactId>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-container</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>0.30</version>
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 20149d4..7f6a2b6 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -140,7 +140,7 @@ public class MQAdminTestUtils {
if (clusterInfo == null) {
return false;
} else {
- HashMap<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable();
+ Map<String, BrokerData> brokers = clusterInfo.getBrokerAddrTable();
for (String brokerName : brokers.keySet()) {
HashMap<Long, String> brokerIps = brokers.get(brokerName).getBrokerAddrs();
for (long brokerId : brokerIps.keySet()) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index c523fd9..8420fdd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -96,17 +96,17 @@ public class BaseConf {
}
// This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
- public static void waitBrokerRegistered(final String nsAddr, final String clusterName,
- final int expectedBrokerNum) {
+ public static void waitBrokerRegistered(final String nsAddr, final String clusterName, final int expectedBrokerNum) {
final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
mqAdminExt.setNamesrvAddr(nsAddr);
try {
mqAdminExt.start();
+ Thread.sleep(10000);
await().atMost(30, TimeUnit.SECONDS).until(() -> {
List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
return brokerDatas.size() == expectedBrokerNum;
});
- for (BrokerController brokerController : brokerControllerList) {
+ for (BrokerController brokerController: brokerControllerList) {
brokerController.getBrokerOuterAPI().refreshMetadata();
}
} catch (Exception e) {
@@ -144,6 +144,7 @@ public class BaseConf {
return mqAdminExt;
}
+
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
@@ -157,8 +158,7 @@ public class BaseConf {
return producer;
}
- public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic,
- TransactionListener transactionListener) {
+ public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
if (debug) {
producer.setDebug();
@@ -168,9 +168,9 @@ public class BaseConf {
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
- String instanceName) {
+ String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
- instanceName);
+ instanceName);
if (debug) {
producer.setDebug();
}
@@ -188,31 +188,31 @@ public class BaseConf {
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
- AbstractListener listener) {
+ AbstractListener listener) {
return getConsumer(nsAddr, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
- AbstractListener listener, boolean useTLS) {
+ AbstractListener listener, boolean useTLS) {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
- String subExpression, AbstractListener listener) {
+ String subExpression, AbstractListener listener) {
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
- String subExpression, AbstractListener listener, boolean useTLS) {
+ String subExpression, AbstractListener listener, boolean useTLS) {
RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
- topic, subExpression, listener, useTLS);
+ topic, subExpression, listener, useTLS);
if (debug) {
consumer.setDebug();
}
mqClients.add(consumer);
log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
- topic, subExpression));
+ topic, subExpression));
return consumer;
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
index 3a649ed..283dcbe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -137,6 +137,7 @@ public class BatchSendIT extends BaseConf {
Thread.sleep(300);
{
DefaultMQPullConsumer defaultMQPullConsumer = ConsumerFactory.getRMQPullConsumer(nsAddr, "group");
+ System.out.println(defaultMQPullConsumer.maxOffset(messageQueue));
PullResult pullResult = defaultMQPullConsumer.pullBlockIfNotFound(messageQueue, "*", 5, batchCount * batchNum);
Assert.assertEquals(PullStatus.FOUND, pullResult.getPullStatus());
@@ -181,7 +182,6 @@ public class BatchSendIT extends BaseConf {
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
-
int batchCount = 10;
int batchNum = 10;
for (int i = 0; i < batchCount; i++) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
index bae5397..eff70a0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java
@@ -81,7 +81,7 @@ public class OrderMsgRebalanceIT extends BaseConf {
}
@Test
- public void testFourConsuemrBalance() {
+ public void testFourConsumerBalance() {
int msgSize = 20;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java b/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java
new file mode 100644
index 0000000..c19a787
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/AddAndRemoveBrokerIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AddAndRemoveBrokerIT extends ContainerIntegrationTestBase {
+ private static BrokerContainer brokerContainer4;
+
+ @BeforeClass
+ public static void beforeClass() {
+ brokerContainer4 = createAndStartBrokerContainer(nsAddr);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ brokerContainer4.shutdown();
+ }
+
+ @Test
+ public void addBrokerTest()
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException {
+ String remark = null;
+ int code = 0;
+ try {
+ defaultMQAdminExt.addBrokerToContainer(brokerContainer4.getBrokerContainerAddr(), "");
+ } catch (MQBrokerException e) {
+ code = e.getResponseCode();
+ remark = e.getErrorMessage();
+ }
+ assertThat(code).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ assertThat(remark).isEqualTo("addBroker properties empty");
+ }
+
+ @Test
+ public void removeBrokerTest()
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException{
+
+ boolean exceptionCaught = false;
+
+ try {
+ defaultMQAdminExt.removeBrokerFromContainer(brokerContainer1.getBrokerContainerAddr(),
+ master3With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master3With3Replicas.getBrokerConfig().getBrokerName(), 1);
+ } catch (MQBrokerException e) {
+ exceptionCaught = true;
+ }
+
+ assertThat(exceptionCaught).isFalse();
+ assertThat(brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
+
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+ awaitUntilSlaveOK();
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java b/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java
new file mode 100644
index 0000000..b4e4baf
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/BrokerFailoverIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class BrokerFailoverIT extends ContainerIntegrationTestBase {
+
+ @Test
+ public void testBrokerFailoverWithoutCompatible() {
+ changeCompatibleMode(false);
+ awaitUntilSlaveOK();
+ testBrokerFailover(false);
+ }
+
+ @Test
+ public void testBrokerFailoverWithCompatible() {
+ changeCompatibleMode(true);
+ awaitUntilSlaveOK();
+ testBrokerFailover(true);
+ }
+
+ private void testBrokerFailover(boolean compatibleMode) {
+ await().atMost(Duration.ofSeconds(10)).until(() ->
+ master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+ && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+ && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+
+ InnerSalveBrokerController targetSlave = getSlaveFromContainerByName(brokerContainer2, master1With3Replicas.getBrokerConfig().getBrokerName());
+
+ assertThat(targetSlave).isNotNull();
+
+ brokerContainer1.registerClientRPCHook(new RPCHook() {
+ @Override
+ public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
+ if (request.getCode() == (compatibleMode ? RequestCode.QUERY_DATA_VERSION : RequestCode.BROKER_HEARTBEAT)) {
+ request.setCode(-1);
+ }
+ }
+
+ @Override
+ public void doAfterResponse(String remoteAddr, RemotingCommand request,
+ RemotingCommand response) {
+
+ }
+
+ @Override
+ public void doAfterRpcFailure(String remoteAddr, RemotingCommand request, Boolean remoteTimeout) {
+
+ }
+ });
+
+ InnerSalveBrokerController finalTargetSlave = targetSlave;
+ await().atMost(Duration.ofSeconds(60)).until(() ->
+ finalTargetSlave.getMessageStore().getAliveReplicaNumInGroup() == 2
+ && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 2
+ && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 2);
+
+ brokerContainer1.clearClientRPCHook();
+
+ await().atMost(Duration.ofSeconds(60)).until(() ->
+ master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+ && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3
+ && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java b/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java
new file mode 100644
index 0000000..ec9b6b1
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/BrokerMemberGroupIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class BrokerMemberGroupIT extends ContainerIntegrationTestBase {
+ @Test
+ public void testSyncBrokerMemberGroup() throws Exception {
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ final BrokerConfig brokerConfig = master1With3Replicas.getBrokerConfig();
+ final BrokerMemberGroup memberGroup = master1With3Replicas.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+ return memberGroup.getBrokerAddrs().size() == 3;
+ });
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ final BrokerConfig brokerConfig = master3With3Replicas.getBrokerConfig();
+ final BrokerMemberGroup memberGroup = master3With3Replicas.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+ return memberGroup.getBrokerAddrs().size() == 3;
+ });
+
+ removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+ removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ final BrokerConfig brokerConfig = master1With3Replicas.getBrokerConfig();
+ final BrokerMemberGroup memberGroup = master1With3Replicas.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+
+ return memberGroup.getBrokerAddrs().size() == 2 && memberGroup.getBrokerAddrs().get(1L) == null;
+ });
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ final BrokerConfig brokerConfig = master3With3Replicas.getBrokerConfig();
+ final BrokerMemberGroup memberGroup = master3With3Replicas.getBrokerOuterAPI()
+ .syncBrokerMemberGroup(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName());
+ return memberGroup.getBrokerAddrs().size() == 2 && memberGroup.getBrokerAddrs().get(1L) == null;
+ });
+
+ createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+
+ awaitUntilSlaveOK();
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
new file mode 100644
index 0000000..fab4c46
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ContainerIntegrationTestBase.java
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.container.BrokerContainerConfig;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * ContainerIntegrationTestBase will setup a rocketmq ha cluster contains two broker group:
+ * <li>BrokerA contains two replicas</li>
+ * <li>BrokerB contains three replicas</li>
+ */
+public class ContainerIntegrationTestBase {
+ private static final AtomicBoolean CLUSTER_SET_UP = new AtomicBoolean(false);
+ private static final List<File> TMP_FILE_LIST = new ArrayList<>();
+ private static final Random RANDOM = new Random();
+ protected static String nsAddr;
+
+ protected static final String THREE_REPLICAS_TOPIC = "SEND_MESSAGE_TEST_TOPIC_THREE_REPLICAS";
+
+ protected static final List<BrokerContainer> brokerContainerList = new ArrayList<>();
+ protected static final List<NamesrvController> namesrvControllers = new ArrayList<>();
+
+ protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+ protected static final int COMMIT_LOG_SIZE = 128 * 1024;
+ protected static final int INDEX_NUM = 1000;
+ protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+
+ protected static BrokerContainer brokerContainer1;
+ protected static BrokerContainer brokerContainer2;
+ protected static BrokerContainer brokerContainer3;
+ protected static BrokerController master1With3Replicas;
+ protected static BrokerController master2With3Replicas;
+ protected static BrokerController master3With3Replicas;
+ protected static NamesrvController namesrvController;
+
+ protected static DefaultMQAdminExt defaultMQAdminExt;
+
+ private final static InternalLogger LOG = InternalLoggerFactory.getLogger(ContainerIntegrationTestBase.class);
+ private static ConcurrentMap<BrokerConfig, MessageStoreConfig> slaveStoreConfigCache = new ConcurrentHashMap<>();
+
+ protected static ConcurrentMap<BrokerConfigLite, BrokerController> isolatedBrokers = new ConcurrentHashMap<>();
+ private static final Set<Integer> PORTS_IN_USE = new HashSet<>();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (CLUSTER_SET_UP.compareAndSet(false, true)) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+ System.setProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.99");
+ System.setProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.99");
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ //https://logback.qos.ch/manual/configuration.html
+ lc.setPackagingDataEnabled(false);
+
+ configurator.doConfigure("../distribution/conf/logback_broker.xml");
+ configurator.doConfigure("../distribution/conf/logback_namesrv.xml");
+
+ setUpCluster();
+ setUpTopic();
+ registerCleaner();
+
+ System.out.printf("cluster setup complete%n");
+ }
+ }
+
+ private static void setUpTopic() {
+ createTopic(THREE_REPLICAS_TOPIC);
+ }
+
+ private static void createTopic(String topic) {
+ createTopicTo(master1With3Replicas, topic);
+ createTopicTo(master2With3Replicas, topic);
+ createTopicTo(master3With3Replicas, topic);
+ }
+
+ private static void setUpCluster() throws Exception {
+ namesrvController = createAndStartNamesrv();
+ nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+ System.out.printf("namesrv addr: %s%n", nsAddr);
+
+ /*
+ * BrokerContainer1 | BrokerContainer2 | BrokerContainer3
+ *
+ * master1With3Replicas(m) master2With3Replicas(m) master3With3Replicas(m)
+ * master3With3Replicas(s0) master1With3Replicas(s0) master2With3Replicas(s0)
+ * master2With3Replicas(s1) master3With3Replicas(s1) master1With3Replicas(s1)
+ */
+
+ brokerContainer1 = createAndStartBrokerContainer(nsAddr);
+ brokerContainer2 = createAndStartBrokerContainer(nsAddr);
+ brokerContainer3 = createAndStartBrokerContainer(nsAddr);
+ // Create three broker groups, two contains two replicas, another contains three replicas
+ master1With3Replicas = createAndAddMaster(brokerContainer1, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+ master2With3Replicas = createAndAddMaster(brokerContainer2, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+ master3With3Replicas = createAndAddMaster(brokerContainer3, new BrokerGroupConfig(), BROKER_INDEX.getAndIncrement());
+
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+ createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+ createAndAddSlave(1, brokerContainer3, master2With3Replicas);
+
+ createAndAddSlave(2, brokerContainer1, master2With3Replicas);
+ createAndAddSlave(2, brokerContainer2, master3With3Replicas);
+ createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+
+ awaitUntilSlaveOK();
+
+ defaultMQAdminExt = new DefaultMQAdminExt("HATest_Admin_Group");
+ defaultMQAdminExt.setNamesrvAddr(nsAddr);
+ defaultMQAdminExt.start();
+ }
+
+ protected static void createTopicTo(BrokerController masterBroker, String topicName, int rqn, int wqn) {
+ try {
+ TopicConfig topicConfig = new TopicConfig(topicName, rqn, wqn, 6, 0);
+ defaultMQAdminExt.createAndUpdateTopicConfig(masterBroker.getBrokerAddr(), topicConfig);
+
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer1);
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer2);
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer3);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Create topic to broker failed", e);
+ }
+ }
+
+ protected static void createGroup(BrokerController masterBroker, String groupName) {
+ try {
+ SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+ config.setGroupName(groupName);
+
+ masterBroker.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
+
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer1);
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer2);
+ triggerSlaveSync(masterBroker.getBrokerConfig().getBrokerName(), brokerContainer3);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Create group to broker failed", e);
+ }
+ }
+
+ private static void triggerSlaveSync(String brokerName, BrokerContainer brokerContainer) {
+ for (InnerSalveBrokerController slaveBroker : brokerContainer.getSlaveBrokers()) {
+ if (slaveBroker.getBrokerConfig().getBrokerName().equals(brokerName)) {
+ slaveBroker.getSlaveSynchronize().syncAll();
+ slaveBroker.registerBrokerAll(true, false, true);
+ }
+ }
+ }
+
+ protected static void createTopicTo(BrokerController brokerController, String topicName) {
+ createTopicTo(brokerController, topicName, 8, 8);
+ }
+
+ private static void registerCleaner() {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (CLUSTER_SET_UP.compareAndSet(true, false)) {
+ System.out.printf("clean up%n");
+ defaultMQAdminExt.shutdown();
+
+ for (final BrokerContainer brokerContainer : brokerContainerList) {
+ brokerContainer.shutdown();
+ for (BrokerController brokerController : brokerContainer.getBrokerControllers()) {
+ brokerController.getMessageStore().destroy();
+ }
+ }
+
+ for (final NamesrvController namesrvController : namesrvControllers) {
+ namesrvController.shutdown();
+ }
+
+ for (final File file : TMP_FILE_LIST) {
+ UtilAll.deleteFile(file);
+ }
+ }
+ }));
+ }
+
+ private static File createBaseDir(String prefix) {
+ final File file;
+ try {
+ file = Files.createTempDirectory(prefix).toFile();
+ TMP_FILE_LIST.add(file);
+ System.out.printf("create file at %s%n", file.getAbsolutePath());
+ return file;
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't create tmp folder", e);
+ }
+ }
+
+ public static NamesrvController createAndStartNamesrv() {
+ String baseDir = createBaseDir("test-cluster-namesrv").getAbsolutePath();
+ NamesrvConfig namesrvConfig = new NamesrvConfig();
+ NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+ namesrvConfig.setKvConfigPath(baseDir + File.separator + "namesrv" + File.separator + "kvConfig.json");
+ namesrvConfig.setConfigStorePath(baseDir + File.separator + "namesrv" + File.separator + "namesrv.properties");
+ namesrvConfig.setSupportActingMaster(true);
+ namesrvConfig.setScanNotActiveBrokerInterval(1000);
+
+ nameServerNettyServerConfig.setListenPort(generatePort(10000, 10000));
+ NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+ try {
+ Assert.assertTrue(namesrvController.initialize());
+ LOG.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+ namesrvController.start();
+ } catch (Exception e) {
+ LOG.info("Name Server start failed");
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ namesrvController.getRemotingServer().registerProcessor(RequestCode.REGISTER_BROKER, new NettyRequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+ final RemotingCommand request) throws Exception {
+ final RegisterBrokerRequestHeader requestHeader = request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ final BrokerConfigLite liteConfig = new BrokerConfigLite(requestHeader.getClusterName(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerId());
+ if (isolatedBrokers.containsKey(liteConfig)) {
+ // return response with SYSTEM_ERROR
+ return RemotingCommand.createResponseCommand(null);
+ }
+ return namesrvController.getRemotingServer().getDefaultProcessorPair().getObject1().processRequest(ctx, request);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, null);
+
+ namesrvControllers.add(namesrvController);
+ return namesrvController;
+
+ }
+
+ public static BrokerContainer createAndStartBrokerContainer(String nsAddr) {
+ BrokerContainerConfig brokerContainerConfig = new BrokerContainerConfig();
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ brokerContainerConfig.setNamesrvAddr(nsAddr);
+ brokerContainerConfig.setCompatibleWithOldNameSrv(false);
+
+ nettyServerConfig.setListenPort(generatePort(20000, 10000));
+ BrokerContainer brokerContainer = new BrokerContainer(brokerContainerConfig, nettyServerConfig, nettyClientConfig);
+ try {
+ Assert.assertTrue(brokerContainer.initialize());
+ LOG.info("Broker container Start, listen on {}.", nettyServerConfig.getListenPort());
+ brokerContainer.start();
+ } catch (Exception e) {
+ LOG.info("Broker container start failed", e);
+ e.printStackTrace();
+ System.exit(1);
+ }
+ brokerContainerList.add(brokerContainer);
+ return brokerContainer;
+ }
+
+ private static int generatePort(int base, int range) {
+ int result = base + RANDOM.nextInt(range);
+ while (PORTS_IN_USE.contains(result) || PORTS_IN_USE.contains(result - 2)) {
+ result = base + RANDOM.nextInt(range);
+ }
+ PORTS_IN_USE.add(result);
+ PORTS_IN_USE.add(result - 2);
+ return result;
+ }
+
+ public static BrokerController createAndAddMaster(BrokerContainer brokerContainer,
+ BrokerGroupConfig brokerGroupConfig, int brokerIndex) throws Exception {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ brokerConfig.setBrokerName(BROKER_NAME_PREFIX + brokerIndex);
+ brokerConfig.setBrokerIP1("127.0.0.1");
+ brokerConfig.setBrokerIP2("127.0.0.1");
+ brokerConfig.setBrokerId(0);
+ brokerConfig.setEnablePropertyFilter(true);
+ brokerConfig.setEnableSlaveActingMaster(brokerGroupConfig.enableSlaveActingMaster);
+ brokerConfig.setEnableRemoteEscape(brokerGroupConfig.enableRemoteEscape);
+ brokerConfig.setSlaveReadEnable(brokerGroupConfig.slaveReadEnable);
+ brokerConfig.setLockInStrictMode(true);
+ brokerConfig.setConsumerOffsetUpdateVersionStep(10);
+ brokerConfig.setDelayOffsetUpdateVersionStep(10);
+ brokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
+
+ String baseDir = createBaseDir(brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId()).getAbsolutePath();
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ storeConfig.setHaListenPort(generatePort(30000, 10000));
+ storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
+ storeConfig.setMaxIndexNum(INDEX_NUM);
+ storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+ storeConfig.setTotalReplicas(brokerGroupConfig.totalReplicas);
+ storeConfig.setInSyncReplicas(brokerGroupConfig.inSyncReplicas);
+ storeConfig.setMinInSyncReplicas(brokerGroupConfig.minReplicas);
+ storeConfig.setEnableAutoInSyncReplicas(brokerGroupConfig.autoReplicas);
+ storeConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+ storeConfig.setSyncFlushTimeout(10 * 1000);
+
+ System.out.printf("start master %s with port %d-%d%n", brokerConfig.getCanonicalName(), brokerConfig.getListenPort(), storeConfig.getHaListenPort());
+ BrokerController brokerController = null;
+ try {
+ brokerController = brokerContainer.addBroker(brokerConfig, storeConfig);
+ Assert.assertNotNull(brokerController);
+ brokerController.start();
+ TMP_FILE_LIST.add(new File(brokerController.getTopicConfigManager().configFilePath()));
+ TMP_FILE_LIST.add(new File(brokerController.getSubscriptionGroupManager().configFilePath()));
+ LOG.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+ } catch (Exception e) {
+ LOG.info("Broker start failed", e);
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ return brokerController;
+ }
+
+ protected static DefaultMQProducer createProducer(String producerGroup) {
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.setInstanceName(UUID.randomUUID().toString());
+ producer.setNamesrvAddr(nsAddr);
+ return producer;
+ }
+
+ protected static TransactionMQProducer createTransactionProducer(String producerGroup,
+ TransactionCheckListener transactionCheckListener) {
+ TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
+ producer.setInstanceName(UUID.randomUUID().toString());
+ producer.setNamesrvAddr(nsAddr);
+ producer.setTransactionCheckListener(transactionCheckListener);
+ return producer;
+ }
+
+ protected static DefaultMQPullConsumer createPullConsumer(String consumerGroup) {
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.setInstanceName(UUID.randomUUID().toString());
+ consumer.setNamesrvAddr(nsAddr);
+ return consumer;
+ }
+
+ protected static DefaultMQPushConsumer createPushConsumer(String consumerGroup) {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
+ consumer.setInstanceName(UUID.randomUUID().toString());
+ consumer.setNamesrvAddr(nsAddr);
+ return consumer;
+ }
+
+ protected static void createAndAddSlave(int slaveBrokerId, BrokerContainer brokerContainer,
+ BrokerController master) {
+ BrokerConfig slaveBrokerConfig = new BrokerConfig();
+ slaveBrokerConfig.setBrokerName(master.getBrokerConfig().getBrokerName());
+ slaveBrokerConfig.setBrokerId(slaveBrokerId);
+ slaveBrokerConfig.setBrokerClusterName(master.getBrokerConfig().getBrokerClusterName());
+
+ slaveBrokerConfig.setBrokerIP1("127.0.0.1");
+ slaveBrokerConfig.setBrokerIP2("127.0.0.1");
+ slaveBrokerConfig.setEnablePropertyFilter(true);
+ slaveBrokerConfig.setSlaveReadEnable(true);
+ slaveBrokerConfig.setEnableSlaveActingMaster(true);
+ slaveBrokerConfig.setEnableRemoteEscape(true);
+ slaveBrokerConfig.setLockInStrictMode(true);
+ slaveBrokerConfig.setListenPort(generatePort(brokerContainer.getRemotingServer().localListenPort(), 10000));
+ slaveBrokerConfig.setConsumerOffsetUpdateVersionStep(10);
+ slaveBrokerConfig.setDelayOffsetUpdateVersionStep(10);
+
+ MessageStoreConfig storeConfig = slaveStoreConfigCache.get(slaveBrokerConfig);
+
+ if (storeConfig == null) {
+ storeConfig = new MessageStoreConfig();
+ String baseDir = createBaseDir(slaveBrokerConfig.getBrokerName() + "_" + slaveBrokerConfig.getBrokerId()).getAbsolutePath();
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
+ storeConfig.setHaListenPort(generatePort(master.getMessageStoreConfig().getHaListenPort(), 10000));
+ storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
+ storeConfig.setMaxIndexNum(INDEX_NUM);
+ storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
+ storeConfig.setTotalReplicas(master.getMessageStoreConfig().getTotalReplicas());
+ storeConfig.setInSyncReplicas(master.getMessageStoreConfig().getInSyncReplicas());
+ storeConfig.setMinInSyncReplicas(master.getMessageStoreConfig().getMinInSyncReplicas());
+ storeConfig.setBrokerRole(BrokerRole.SLAVE);
+ slaveStoreConfigCache.put(slaveBrokerConfig, storeConfig);
+ }
+
+ System.out.printf("start slave %s with port %d-%d%n", slaveBrokerConfig.getCanonicalName(), slaveBrokerConfig.getListenPort(), storeConfig.getHaListenPort());
+
+ try {
+ BrokerController brokerController = brokerContainer.addBroker(slaveBrokerConfig, storeConfig);
+ Assert.assertNotNull(brokerContainer);
+ brokerController.start();
+ TMP_FILE_LIST.add(new File(brokerController.getTopicConfigManager().configFilePath()));
+ TMP_FILE_LIST.add(new File(brokerController.getSubscriptionGroupManager().configFilePath()));
+ LOG.info("Add slave name:{} addr:{}", slaveBrokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Couldn't add slave broker", e);
+ }
+ }
+
+ protected static void removeSlaveBroker(int slaveBrokerId, BrokerContainer brokerContainer,
+ BrokerController master) throws Exception {
+ BrokerIdentity brokerIdentity = new BrokerIdentity(master.getBrokerConfig().getBrokerClusterName(),
+ master.getBrokerConfig().getBrokerName(), slaveBrokerId);
+
+ brokerContainer.removeBroker(brokerIdentity);
+ }
+
+ protected static void awaitUntilSlaveOK() {
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> {
+ boolean isOk = master1With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+ for (HAConnection haConnection : master1With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+ isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+ }
+ return isOk;
+ });
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> {
+ boolean isOk = master2With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+ && master2With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+ for (HAConnection haConnection : master2With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+ isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+ }
+ return isOk;
+ });
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> {
+ boolean isOk = master3With3Replicas.getMessageStore().getHaService().getConnectionCount().get() == 2
+ && master3With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3;
+ for (HAConnection haConnection : master3With3Replicas.getMessageStore().getHaService().getConnectionList()) {
+ isOk &= haConnection.getCurrentState().equals(HAConnectionState.TRANSFER);
+ }
+ return isOk;
+ });
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected static void isolateBroker(BrokerController brokerController) {
+ final BrokerConfig config = brokerController.getBrokerConfig();
+
+ BrokerConfigLite liteConfig = new BrokerConfigLite(config.getBrokerClusterName(),
+ config.getBrokerName(),
+ brokerController.getBrokerAddr(),
+ config.getBrokerId());
+
+ // Reject register requests from the specific broker
+ isolatedBrokers.putIfAbsent(liteConfig, brokerController);
+
+ // UnRegister the specific broker immediately
+ namesrvController.getRouteInfoManager().unregisterBroker(liteConfig.getClusterName(),
+ liteConfig.getBrokerAddr(),
+ liteConfig.getBrokerName(),
+ liteConfig.getBrokerId());
+ }
+
+ protected static void cancelIsolatedBroker(BrokerController brokerController) {
+ final BrokerConfig config = brokerController.getBrokerConfig();
+
+ BrokerConfigLite liteConfig = new BrokerConfigLite(config.getBrokerClusterName(),
+ config.getBrokerName(),
+ brokerController.getBrokerAddr(),
+ config.getBrokerId());
+
+ isolatedBrokers.remove(liteConfig);
+ brokerController.registerBrokerAll(true, false, true);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> namesrvController.getRouteInfoManager()
+ .getBrokerMemberGroup(liteConfig.getClusterName(), liteConfig.brokerName).getBrokerAddrs()
+ .containsKey(liteConfig.getBrokerId()));
+ }
+
+ protected static InnerSalveBrokerController getSlaveFromContainerByName(BrokerContainer brokerContainer,
+ String brokerName) {
+ InnerSalveBrokerController targetSlave = null;
+ for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
+ if (slave.getBrokerConfig().getBrokerName().equals(brokerName)) {
+ targetSlave = slave;
+ }
+ }
+
+ return targetSlave;
+ }
+
+ protected static void changeCompatibleMode(boolean compatibleMode) {
+ brokerContainer1.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+ brokerContainer2.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+ brokerContainer3.getBrokerContainerConfig().setCompatibleWithOldNameSrv(compatibleMode);
+ }
+
+ protected static Set<MessageQueue> filterMessageQueue(Set<MessageQueue> mqSet, String topic) {
+ Set<MessageQueue> targetMqSet = new HashSet<>();
+ if (topic != null) {
+ for (MessageQueue mq : mqSet) {
+ if (mq.getTopic().equals(topic)) {
+ targetMqSet.add(mq);
+ }
+ }
+ }
+
+ return targetMqSet;
+ }
+
+ public static class BrokerGroupConfig {
+ int totalReplicas = 3;
+ int minReplicas = 1;
+ int inSyncReplicas = 2;
+ boolean autoReplicas = true;
+ boolean enableSlaveActingMaster = true;
+ boolean enableRemoteEscape = true;
+ boolean slaveReadEnable = true;
+
+ public BrokerGroupConfig() {
+ }
+
+ public BrokerGroupConfig(final int totalReplicas, final int minReplicas, final int inSyncReplicas,
+ final boolean autoReplicas, boolean enableSlaveActingMaster, boolean slaveReadEnable) {
+ this.totalReplicas = totalReplicas;
+ this.minReplicas = minReplicas;
+ this.inSyncReplicas = inSyncReplicas;
+ this.autoReplicas = autoReplicas;
+ this.enableSlaveActingMaster = enableSlaveActingMaster;
+ this.slaveReadEnable = slaveReadEnable;
+ }
+ }
+
+ static class BrokerConfigLite {
+ private String clusterName;
+ private String brokerName;
+ private String brokerAddr;
+ private long brokerId;
+
+ public BrokerConfigLite(final String clusterName, final String brokerName, final String brokerAddr,
+ final long brokerId) {
+ this.clusterName = clusterName;
+ this.brokerName = brokerName;
+ this.brokerAddr = brokerAddr;
+ this.brokerId = brokerId;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ final BrokerConfigLite lite = (BrokerConfigLite) o;
+
+ return new EqualsBuilder()
+ .append(clusterName, lite.clusterName)
+ .append(brokerName, lite.brokerName)
+ .append(brokerAddr, lite.brokerAddr)
+ .append(brokerId, lite.brokerId)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(clusterName)
+ .append(brokerName)
+ .append(brokerAddr)
+ .append(brokerId)
+ .toHashCode();
+ }
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java b/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java
new file mode 100644
index 0000000..0b55d77
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/GetMaxOffsetFromSlaveIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class GetMaxOffsetFromSlaveIT extends ContainerIntegrationTestBase {
+ private static DefaultMQProducer mqProducer;
+
+ private final byte[] MESSAGE_BODY = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+ public GetMaxOffsetFromSlaveIT() throws UnsupportedEncodingException {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws MQClientException {
+ mqProducer = createProducer(GetMaxOffsetFromSlaveIT.class.getSimpleName() + "_Producer");
+ mqProducer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (mqProducer != null) {
+ mqProducer.shutdown();
+ }
+ }
+
+ @Test
+ public void testGetMaxOffsetFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ awaitUntilSlaveOK();
+ mqProducer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+ for (int i = 0; i < 100; i++) {
+ Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ SendResult sendResult = mqProducer.send(msg, 10000);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ }
+
+ Map<Integer, Long> maxOffsetMap = new HashMap<>();
+ TopicPublishInfo publishInfo = mqProducer.getDefaultMQProducerImpl().getTopicPublishInfoTable().get(THREE_REPLICAS_TOPIC);
+ assertThat(publishInfo).isNotNull();
+ for (MessageQueue mq : publishInfo.getMessageQueueList()) {
+ maxOffsetMap.put(mq.getQueueId(), mqProducer.getDefaultMQProducerImpl().
+ maxOffset(new MessageQueue(THREE_REPLICAS_TOPIC, master3With3Replicas.getBrokerConfig().getBrokerName(), mq.getQueueId())));
+ }
+
+ isolateBroker(master3With3Replicas);
+
+ mqProducer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+ assertThat(mqProducer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(
+ master3With3Replicas.getBrokerConfig().getBrokerName())).isNotNull();
+
+ for (MessageQueue mq : publishInfo.getMessageQueueList()) {
+ assertThat(mqProducer.getDefaultMQProducerImpl().maxOffset(
+ new MessageQueue(THREE_REPLICAS_TOPIC, master3With3Replicas.getBrokerConfig().getBrokerName(), mq.getQueueId())))
+ .isEqualTo(maxOffsetMap.get(mq.getQueueId()));
+ }
+
+ cancelIsolatedBroker(master3With3Replicas);
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
new file mode 100644
index 0000000..7138f42
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/GetMetadataReverseIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+@Ignore
+public class GetMetadataReverseIT extends ContainerIntegrationTestBase {
+
+ private static DefaultMQProducer producer;
+
+ private static final String CONSUMER_GROUP = GetMetadataReverseIT.class.getSimpleName() + "_Consumer";
+
+ private static final int MESSAGE_COUNT = 32;
+
+ private final static Random random = new Random();
+
+ public GetMetadataReverseIT() throws UnsupportedEncodingException {
+
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ producer = createProducer(PushMultipleReplicasIT.class.getSimpleName() + "_PRODUCER");
+ producer.setSendMsgTimeout(15 * 1000);
+ producer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producer.shutdown();
+ }
+
+ @Test
+ public void testGetMetadataReverse_consumerOffset() throws Exception {
+ String topic = GetMetadataReverseIT.class.getSimpleName() + "_consumerOffset" + random.nextInt(65535);
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ // Wait topic synchronization
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+ return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+ });
+
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, Integer.toString(i).getBytes());
+ SendResult sendResult = producer.send(msg);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+ System.out.printf("send success%n");
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+ System.out.printf("Remove master%n");
+
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+ await().atMost(Duration.ofMinutes(3)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+ Map<Integer, Long> slaveOffsetTable = null;
+ for (InnerSalveBrokerController slave : brokerContainer2.getSlaveBrokers()) {
+ if (slave.getBrokerConfig().getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+ slaveOffsetTable = slave.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+ }
+ }
+
+ if (slaveOffsetTable != null) {
+ long totalOffset = 0;
+ for (final Long offset : slaveOffsetTable.values()) {
+ totalOffset += offset;
+ }
+
+ return totalOffset >= MESSAGE_COUNT;
+ }
+ return false;
+ });
+
+ //Add back master
+ master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master%n");
+
+ awaitUntilSlaveOK();
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ Map<Integer, Long> offsetTable = master1With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+ long totalOffset = 0;
+ if (offsetTable != null) {
+ for (final Long offset : offsetTable.values()) {
+ totalOffset += offset;
+ }
+ }
+ return totalOffset >= MESSAGE_COUNT;
+ });
+
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void testGetMetadataReverse_delayOffset() throws Exception {
+ String topic = GetMetadataReverseIT.class.getSimpleName() + "_delayOffset" + random.nextInt(65535);
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ createTopicTo(master2With3Replicas, topic, 1, 1);
+ createTopicTo(master3With3Replicas, topic, 1, 1);
+ // Wait topic synchronization
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+ return slaveBroker.getTopicConfigManager().selectTopicConfig(topic) != null;
+ });
+ int delayLevel = 4;
+
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, Integer.toString(i).getBytes());
+ msg.setDelayTimeLevel(delayLevel);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+ System.out.printf("send success%n");
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+ System.out.printf("Remove master%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+ Map<Integer, Long> OffsetTable = master2With3Replicas.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, topic);
+ if (OffsetTable != null) {
+ long totalOffset = 0;
+ for (final Long offset : OffsetTable.values()) {
+ totalOffset += offset;
+ }
+ return totalOffset >= MESSAGE_COUNT;
+
+ }
+ return false;
+ });
+
+ //Add back master
+ master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master%n");
+
+ awaitUntilSlaveOK();
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ Map<Integer, Long> offsetTable = master1With3Replicas.getScheduleMessageService().getOffsetTable();
+ System.out.println("" + offsetTable.get(delayLevel));
+ return offsetTable.get(delayLevel) >= MESSAGE_COUNT;
+ });
+
+ pushConsumer.shutdown();
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java
new file mode 100644
index 0000000..0181645
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/PullMultipleReplicasIT.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class PullMultipleReplicasIT extends ContainerIntegrationTestBase {
+ private static DefaultMQPullConsumer pullConsumer;
+ private static DefaultMQProducer producer;
+ private static MQClientInstance mqClientInstance;
+
+ private final String MESSAGE_STRING = RandomStringUtils.random(1024);
+ private final byte[] MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+ public PullMultipleReplicasIT() throws UnsupportedEncodingException {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+
+ pullConsumer = createPullConsumer(PullMultipleReplicasIT.class.getSimpleName() + "_Consumer");
+ pullConsumer.start();
+
+ Field field = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ mqClientInstance = (MQClientInstance) field.get(pullConsumer.getDefaultMQPullConsumerImpl());
+
+ producer = createProducer(PullMultipleReplicasIT.class.getSimpleName() + "_Producer");
+ producer.setSendMsgTimeout(15 * 1000);
+ producer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ producer.shutdown();
+ pullConsumer.shutdown();
+ }
+
+ @Test
+ public void testPullMessageFromSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
+ awaitUntilSlaveOK();
+
+ Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ SendResult sendResult = producer.send(msg);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+ final MessageQueue messageQueue = sendResult.getMessageQueue();
+ final long queueOffset = sendResult.getQueueOffset();
+
+ final PullResult[] pullResult = {null};
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+ return pullResult[0].getPullStatus() == PullStatus.FOUND;
+ });
+
+ List<MessageExt> msgFoundList = pullResult[0].getMsgFoundList();
+ assertThat(msgFoundList.size()).isEqualTo(1);
+ assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+ // Pull the same message from the slave broker
+ pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 1);
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+ return pullResult[0].getPullStatus() == PullStatus.FOUND;
+ });
+
+ msgFoundList = pullResult[0].getMsgFoundList();
+ assertThat(msgFoundList.size()).isEqualTo(1);
+ assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+ // Pull the same message from the slave broker
+ pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 2);
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+ return pullResult[0].getPullStatus() == PullStatus.FOUND;
+ });
+
+ msgFoundList = pullResult[0].getMsgFoundList();
+ assertThat(msgFoundList.size()).isEqualTo(1);
+ assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+ pullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().updatePullFromWhichNode(messageQueue, 0);
+ }
+
+ @Test
+ public void testSendMessageBackToSlave() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
+ awaitUntilSlaveOK();
+
+ String clusterTopic = "TOPIC_ON_BROKER2_AND_BROKER3_FOR_MESSAGE_BACK";
+ createTopicTo(master1With3Replicas, clusterTopic);
+ createTopicTo(master3With3Replicas, clusterTopic);
+
+ Message msg = new Message(clusterTopic, MESSAGE_BODY);
+ producer.setSendMsgTimeout(10 * 1000);
+
+ final MessageQueue[] selectedQueue = new MessageQueue[1];
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ for (final MessageQueue queue : producer.fetchPublishMessageQueues(clusterTopic)) {
+ if (queue.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+ selectedQueue[0] = queue;
+ }
+ }
+ return selectedQueue[0] != null;
+ });
+
+ SendResult sendResult = producer.send(msg, selectedQueue[0]);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+ final MessageQueue messageQueue = sendResult.getMessageQueue();
+ final long queueOffset = sendResult.getQueueOffset();
+
+ final PullResult[] pullResult = {null};
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ pullResult[0] = pullConsumer.pull(messageQueue, "*", queueOffset, 1);
+ return pullResult[0].getPullStatus() == PullStatus.FOUND;
+ });
+
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ DefaultMessageStore messageStore = (DefaultMessageStore) master3With3Replicas.getMessageStore();
+ return messageStore.getHaService().inSyncSlaveNums(messageStore.getMaxPhyOffset()) == 2;
+ });
+
+ InnerSalveBrokerController slaveBroker = null;
+ for (InnerSalveBrokerController slave : brokerContainer1.getSlaveBrokers()) {
+ if (slave.getBrokerConfig().getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+ slaveBroker = slave;
+ }
+ }
+
+ assertThat(slaveBroker).isNotNull();
+
+ MessageExt backMessage = pullResult[0].getMsgFoundList().get(0);
+
+ // Message will be sent to the master broker(master1With3Replicas) beside a slave broker of master3With3Replicas
+ backMessage.setStoreHost(new InetSocketAddress(slaveBroker.getBrokerConfig().getBrokerIP1(), slaveBroker.getBrokerConfig().getListenPort()));
+ pullConsumer.sendMessageBack(backMessage, 0);
+
+ String retryTopic = MixAll.getRetryTopic(pullConsumer.getConsumerGroup());
+ // Retry topic only has one queue by default
+ MessageQueue newMsgQueue = new MessageQueue(retryTopic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ await().atMost(Duration.ofSeconds(60)).until(() -> {
+ pullResult[0] = pullConsumer.pull(newMsgQueue, "*", 0, 1);
+ return pullResult[0].getPullStatus() == PullStatus.FOUND;
+ });
+
+ List<MessageExt> msgFoundList = pullResult[0].getMsgFoundList();
+ assertThat(msgFoundList.size()).isEqualTo(1);
+ assertThat(new String(msgFoundList.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET)).isEqualTo(MESSAGE_STRING);
+
+ awaitUntilSlaveOK();
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java
new file mode 100644
index 0000000..abe1d5e
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/PushMultipleReplicasIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class PushMultipleReplicasIT extends ContainerIntegrationTestBase {
+ private static DefaultMQProducer producer;
+
+ private static final String TOPIC = PushMultipleReplicasIT.class.getSimpleName() + "_TOPIC";
+ private static final String REDIRECT_TOPIC = PushMultipleReplicasIT.class.getSimpleName() + "_REDIRECT_TOPIC";
+ private static final String CONSUMER_GROUP = PushMultipleReplicasIT.class.getSimpleName() + "_Consumer";
+ private static final int MESSAGE_COUNT = 32;
+
+ public PushMultipleReplicasIT() throws UnsupportedEncodingException {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ createTopicTo(master1With3Replicas, TOPIC,1, 1);
+ producer = createProducer(PushMultipleReplicasIT.class.getSimpleName() + "_PRODUCER");
+ producer.setSendMsgTimeout(15 * 1000);
+ producer.start();
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ producer.send(new Message(TOPIC, Integer.toString(i).getBytes()));
+ }
+
+ createTopicTo(master3With3Replicas, REDIRECT_TOPIC, 1, 1);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producer.shutdown();
+ }
+
+ @Test
+ public void consumeMessageFromSlave_PushConsumer() throws MQClientException {
+ // Wait topic synchronization
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ InnerSalveBrokerController slaveBroker = brokerContainer2.getSlaveBrokers().iterator().next();
+ return slaveBroker.getTopicConfigManager().selectTopicConfig(TOPIC) != null;
+ });
+ isolateBroker(master1With3Replicas);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUMER_GROUP);
+ pushConsumer.subscribe(TOPIC, "*");
+ pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+ await().atMost(Duration.ofMinutes(5)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ pushConsumer.getDefaultMQPushConsumerImpl().persistConsumerOffset();
+ Map<Integer, Long> slaveOffsetTable = null;
+ for (InnerSalveBrokerController slave : brokerContainer2.getSlaveBrokers()) {
+ if (slave.getBrokerConfig().getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+ slaveOffsetTable = slave.getConsumerOffsetManager().queryOffset(CONSUMER_GROUP, TOPIC);
+ }
+ }
+
+ if (slaveOffsetTable != null) {
+ long totalOffset = 0;
+ for (final Long offset : slaveOffsetTable.values()) {
+ totalOffset += offset;
+ }
+
+ return totalOffset >= MESSAGE_COUNT;
+ }
+ return false;
+ });
+
+ pushConsumer.shutdown();
+ cancelIsolatedBroker(master1With3Replicas);
+
+ awaitUntilSlaveOK();
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java b/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java
new file mode 100644
index 0000000..41345b1
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/RebalanceLockOnSlaveIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Test lock on slave when acting master enabled
+ */
+public class RebalanceLockOnSlaveIT extends ContainerIntegrationTestBase {
+ private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
+
+ private static DefaultMQProducer mqProducer;
+ private static DefaultMQPushConsumer mqConsumerThreeReplica1;
+ private static DefaultMQPushConsumer mqConsumerThreeReplica2;
+ private static DefaultMQPushConsumer mqConsumerThreeReplica3;
+
+ public RebalanceLockOnSlaveIT() {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
+ mqProducer.start();
+
+ mqConsumerThreeReplica1 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+ mqConsumerThreeReplica1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ mqConsumerThreeReplica1.subscribe(THREE_REPLICAS_TOPIC, "*");
+
+ mqConsumerThreeReplica2 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+ mqConsumerThreeReplica2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ mqConsumerThreeReplica2.subscribe(THREE_REPLICAS_TOPIC, "*");
+
+ mqConsumerThreeReplica3 = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+ mqConsumerThreeReplica3.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ mqConsumerThreeReplica3.subscribe(THREE_REPLICAS_TOPIC, "*");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (mqProducer != null) {
+ mqProducer.shutdown();
+ }
+ }
+
+ @Test
+ public void lockFromSlave() throws Exception {
+ awaitUntilSlaveOK();
+
+ mqConsumerThreeReplica3.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+ mqConsumerThreeReplica3.start();
+
+ final Set<MessageQueue> mqSet = mqConsumerThreeReplica3.fetchSubscribeMessageQueues(THREE_REPLICAS_TOPIC);
+
+ assertThat(targetTopicMqCount(mqSet, THREE_REPLICAS_TOPIC)).isEqualTo(24);
+
+ for (MessageQueue mq : mqSet) {
+ await().atMost(Duration.ofSeconds(60)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+ }
+
+ isolateBroker(master3With3Replicas);
+
+ mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+ FindBrokerResult result = mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+ master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true);
+ assertThat(result).isNotNull();
+
+ for (MessageQueue mq : mqSet) {
+ if (mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+ await().atMost(Duration.ofSeconds(60)).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+ }
+ }
+
+ removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+ assertThat(brokerContainer1.getSlaveBrokers().size()).isEqualTo(1);
+
+ mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+ for (MessageQueue mq : mqSet) {
+ if (mq.getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+ await().atMost(Duration.ofSeconds(60)).until(() -> !mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getRebalanceImpl().lock(mq));
+ }
+ }
+
+ cancelIsolatedBroker(master3With3Replicas);
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+ awaitUntilSlaveOK();
+
+ mqConsumerThreeReplica3.shutdown();
+ await().atMost(100, TimeUnit.SECONDS).until(() -> mqConsumerThreeReplica3.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY);
+ }
+
+ @Ignore
+ @Test
+ public void multiConsumerLockFromSlave() throws MQClientException, InterruptedException {
+ awaitUntilSlaveOK();
+
+ mqConsumerThreeReplica1.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+ mqConsumerThreeReplica1.start();
+
+ mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
+ Set<MessageQueue> mqSet1 = filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+ assertThat(mqSet1.size()).isEqualTo(24);
+
+ isolateBroker(master3With3Replicas);
+
+ System.out.printf("%s isolated%n", master3With3Replicas.getBrokerConfig().getCanonicalName());
+
+ Thread.sleep(5000);
+
+ mqConsumerThreeReplica2.registerMessageListener((MessageListenerOrderly) (msgs, context) -> ConsumeOrderlyStatus.SUCCESS);
+ mqConsumerThreeReplica2.start();
+
+ Thread.sleep(5000);
+
+ mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+ mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(THREE_REPLICAS_TOPIC);
+
+ assertThat(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+ master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true)).isNotNull();
+
+ mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+ master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true);
+ assertThat(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getmQClientFactory().findBrokerAddressInSubscribe(
+ master3With3Replicas.getBrokerConfig().getBrokerName(), MixAll.MASTER_ID, true)).isNotNull();
+
+ mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().doRebalance();
+ mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().doRebalance();
+
+ Set<MessageQueue> mqSet2 = filterMessageQueue(mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+ mqSet1 = filterMessageQueue(mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable().keySet(), THREE_REPLICAS_TOPIC);
+
+ List<MessageQueue> mqList = new ArrayList<>();
+
+ for (MessageQueue mq : mqSet2) {
+ if (mq.getTopic().equals(THREE_REPLICAS_TOPIC)) {
+ mqList.add(mq);
+ }
+ }
+
+ for (MessageQueue mq : mqSet1) {
+ if (mq.getTopic().equals(THREE_REPLICAS_TOPIC)) {
+ mqList.add(mq);
+ }
+ }
+
+ await().atMost(Duration.ofSeconds(30)).until(() -> mqList.size() == 24);
+
+ cancelIsolatedBroker(master3With3Replicas);
+ awaitUntilSlaveOK();
+
+ mqConsumerThreeReplica1.shutdown();
+ mqConsumerThreeReplica2.shutdown();
+
+ await().atMost(100, TimeUnit.SECONDS).until(() ->
+ mqConsumerThreeReplica1.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY &&
+ mqConsumerThreeReplica2.getDefaultMQPushConsumerImpl().getServiceState() == ServiceState.SHUTDOWN_ALREADY
+ );
+ }
+
+ private static int targetTopicMqCount(Set<MessageQueue> mqSet, String topic) {
+ int count = 0;
+ for (MessageQueue mq : mqSet) {
+ if (mq.getTopic().equals(topic)) {
+ count++;
+ }
+ }
+ return count;
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
new file mode 100644
index 0000000..c2850f3
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ScheduleSlaveActingMasterIT.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.BrokerIdentity;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+//The test is correct, but it takes too much time, so it is ignored for the time being
+@Ignore
+public class ScheduleSlaveActingMasterIT extends ContainerIntegrationTestBase {
+
+ private static final String CONSUME_GROUP = ScheduleSlaveActingMasterIT.class.getSimpleName() + "_Consumer";
+ private final static int MESSAGE_COUNT = 32;
+ private final static Random random = new Random();
+ private static DefaultMQProducer producer;
+ private final static String MESSAGE_STRING = RandomStringUtils.random(1024);
+ private static byte[] MESSAGE_BODY;
+
+ static {
+ try {
+ MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ void createTopic(String topic) {
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ createTopicTo(master2With3Replicas, topic, 1, 1);
+ createTopicTo(master3With3Replicas, topic, 1, 1);
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ producer = createProducer(ScheduleSlaveActingMasterIT.class.getSimpleName() + "_PRODUCER");
+ producer.setSendMsgTimeout(5000);
+ producer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producer.shutdown();
+ }
+
+ @Test
+ public void testLocalActing_delayMsg() throws Exception {
+ awaitUntilSlaveOK();
+ String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
+ createTopic(topic);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ long period = System.currentTimeMillis() - msgs.get(0).getBornTimestamp();
+ if (Math.abs(period - 30000) <= 4000) {
+ inTimeMsgCount.addAndGet(msgs.size());
+ }
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ msg.setDelayTimeLevel(4);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+ System.out.printf("send success%n");
+
+ isolateBroker(master1With3Replicas);
+ brokerContainer1.removeBroker(new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId()));
+
+ System.out.printf("Remove master1%n");
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+ System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+ pushConsumer.shutdown();
+
+ //Add back master
+ master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master1%n");
+
+ awaitUntilSlaveOK();
+ // sleep a while to recover
+ Thread.sleep(30000);
+ }
+
+ @Test
+ public void testRemoteActing_delayMsg() throws Exception {
+ awaitUntilSlaveOK();
+
+ String topic = ScheduleSlaveActingMasterIT.class.getSimpleName() + random.nextInt(65535);
+ createTopic(topic);
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+ AtomicInteger master3MsgCount = new AtomicInteger(0);
+
+ MessageQueue messageQueue = new MessageQueue(topic, master1With3Replicas.getBrokerConfig().getBrokerName(), 0);
+ int sendSuccess = 0;
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ msg.setDelayTimeLevel(4);
+ SendResult sendResult = producer.send(msg, messageQueue);
+ if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ sendSuccess++;
+ }
+ }
+ final int finalSendSuccess = sendSuccess;
+ await().atMost(Duration.ofMinutes(1)).until(() -> finalSendSuccess >= MESSAGE_COUNT);
+ long sendCompleteTimeStamp = System.currentTimeMillis();
+ System.out.printf("send success%n");
+
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP);
+ pushConsumer.subscribe(topic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ long period = System.currentTimeMillis() - sendCompleteTimeStamp;
+ // Remote Acting lead to born timestamp, msgId changed, it need to polish.
+ if (Math.abs(period - 30000) <= 4000) {
+ inTimeMsgCount.addAndGet(msgs.size());
+ }
+ if (msgs.get(0).getBrokerName().equals(master3With3Replicas.getBrokerConfig().getBrokerName())) {
+ master3MsgCount.addAndGet(msgs.size());
+ }
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf("cost " + period + " " + x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ isolateBroker(master1With3Replicas);
+ BrokerIdentity master1BrokerIdentity = new BrokerIdentity(
+ master1With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master1With3Replicas.getBrokerConfig().getBrokerName(),
+ master1With3Replicas.getBrokerConfig().getBrokerId());
+
+ brokerContainer1.removeBroker(master1BrokerIdentity);
+ System.out.printf("Remove master1%n");
+
+ isolateBroker(master2With3Replicas);
+ BrokerIdentity master2BrokerIdentity = new BrokerIdentity(
+ master2With3Replicas.getBrokerConfig().getBrokerClusterName(),
+ master2With3Replicas.getBrokerConfig().getBrokerName(),
+ master2With3Replicas.getBrokerConfig().getBrokerId());
+ brokerContainer2.removeBroker(master2BrokerIdentity);
+ System.out.printf("Remove master2%n");
+
+ await().atMost(Duration.ofMinutes(2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && master3MsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+ System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+ pushConsumer.shutdown();
+
+ //Add back master
+ master1With3Replicas = brokerContainer1.addBroker(master1With3Replicas.getBrokerConfig(), master1With3Replicas.getMessageStoreConfig());
+ master1With3Replicas.start();
+ cancelIsolatedBroker(master1With3Replicas);
+ System.out.printf("Add back master1%n");
+
+ //Add back master
+ master2With3Replicas = brokerContainer2.addBroker(master2With3Replicas.getBrokerConfig(), master2With3Replicas.getMessageStoreConfig());
+ master2With3Replicas.start();
+ cancelIsolatedBroker(master2With3Replicas);
+ System.out.printf("Add back master2%n");
+
+ awaitUntilSlaveOK();
+ // sleep a while to recover
+ Thread.sleep(30000);
+ }
+
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
new file mode 100644
index 0000000..6e5c9a9
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/ScheduledMessageIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class ScheduledMessageIT extends ContainerIntegrationTestBase {
+ private static DefaultMQProducer producer;
+
+ private static final String CONSUME_GROUP = ScheduledMessageIT.class.getSimpleName() + "_Consumer";
+ private static final String MESSAGE_STRING = RandomStringUtils.random(1024);
+ private static byte[] MESSAGE_BODY;
+
+ static {
+ try {
+ MESSAGE_BODY = MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException ignored) {
+ }
+ }
+
+ private static final String TOPIC_PREFIX = ScheduledMessageIT.class.getSimpleName() + "_TOPIC";
+ private static Random random = new Random();
+ private static final int MESSAGE_COUNT = 128;
+
+ public ScheduledMessageIT() throws UnsupportedEncodingException {
+ }
+
+ void createTopic(String topic) {
+ createTopicTo(master1With3Replicas, topic, 1, 1);
+ createTopicTo(master2With3Replicas, topic, 1, 1);
+ createTopicTo(master3With3Replicas, topic, 1, 1);
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ producer = createProducer(ScheduledMessageIT.class.getSimpleName() + "_PRODUCER");
+ producer.setSendMsgTimeout(5000);
+ producer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ producer.shutdown();
+ }
+
+ @Ignore
+ @Test
+ public void consumeScheduledMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ String topic = TOPIC_PREFIX + random.nextInt(65535);
+ createTopic(topic);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ AtomicInteger inTimeMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ long period = System.currentTimeMillis() - msgs.get(0).getBornTimestamp();
+ if (Math.abs(period - 5000) <= 1000) {
+ inTimeMsgCount.addAndGet(msgs.size());
+ }
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(receivedMsgCount.get()+" cost " + period + " " + x + "%n"));
+
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ msg.setDelayTimeLevel(2);
+ producer.send(msg);
+ }
+
+ await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT && inTimeMsgCount.get() >= MESSAGE_COUNT * 0.95);
+
+ System.out.printf("consumer received %d msg, %d in time%n", receivedMsgCount.get(), inTimeMsgCount.get());
+
+ pushConsumer.shutdown();
+ }
+
+ @Test
+ public void consumeScheduledMsgFromSlave() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ String topic = TOPIC_PREFIX + random.nextInt(65535);
+ createTopic(topic);
+ DefaultMQPushConsumer pushConsumer = createPushConsumer(CONSUME_GROUP + random.nextInt(65535));
+ pushConsumer.subscribe(topic, "*");
+ AtomicInteger receivedMsgCount = new AtomicInteger(0);
+ pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ receivedMsgCount.addAndGet(msgs.size());
+ msgs.forEach(x -> System.out.printf(x + "%n"));
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = new Message(topic, String.valueOf(i).getBytes());
+ msg.setDelayTimeLevel(2);
+ producer.send(msg);
+ }
+
+ isolateBroker(master1With3Replicas);
+
+ producer.getDefaultMQProducerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
+ assertThat(producer.getDefaultMQProducerImpl().getmQClientFactory().findBrokerAddressInPublish(topic)).isNull();
+
+ pushConsumer.start();
+
+ await().atMost(Duration.ofSeconds(MESSAGE_COUNT * 2)).until(() -> receivedMsgCount.get() >= MESSAGE_COUNT);
+
+ pushConsumer.shutdown();
+ cancelIsolatedBroker(master1With3Replicas);
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+ }
+
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java
new file mode 100644
index 0000000..ea35781
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SendMultipleReplicasIT.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SendMultipleReplicasIT extends ContainerIntegrationTestBase {
+ private static DefaultMQProducer mqProducer;
+
+ private final byte[] MESSAGE_BODY = ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+ public SendMultipleReplicasIT() throws UnsupportedEncodingException {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ mqProducer = createProducer("SendMultipleReplicasMessageIT_Producer");
+ mqProducer.setSendMsgTimeout(15 * 1000);
+ mqProducer.start();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (mqProducer != null) {
+ mqProducer.shutdown();
+ }
+ }
+
+ @Test
+ public void sendMessageToBrokerGroup() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ awaitUntilSlaveOK();
+
+ // Send message to broker group with three replicas
+ Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ SendResult sendResult = mqProducer.send(msg);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ }
+
+ @Test
+ public void sendMessage_Auto_Replicas_Success() throws Exception {
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+ // Broker with 3 replicas configured as 3-2-1 auto replicas mode
+ Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ SendResult sendResult = mqProducer.send(msg);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+ // Remove two slave broker
+ removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+ removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() ->
+ ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
+
+ master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(true);
+ List<MessageQueue> mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues(THREE_REPLICAS_TOPIC);
+ MessageQueue targetMq = null;
+ for (MessageQueue mq : mqList) {
+ if (mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+ targetMq = mq;
+ }
+ }
+
+ assertThat(targetMq).isNotNull();
+ // Although this broker group only has one slave broker, send will be success in auto mode.
+ msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ sendResult = mqProducer.send(msg, targetMq);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+
+ // Recover the cluster state
+ createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+ createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+ }
+
+ @Test
+ public void sendMessage_Auto_Replicas_Failed()
+ throws Exception {
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+ // Broker with 3 replicas configured as 3-2-1 auto replicas mode
+ // Remove two slave broker
+ removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+ removeSlaveBroker(2, brokerContainer3, master1With3Replicas);
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() ->
+ ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 0
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 1);
+
+ // Disable the auto mode
+ master1With3Replicas.getMessageStoreConfig().setEnableAutoInSyncReplicas(false);
+
+ List<MessageQueue> mqList = mqProducer.getDefaultMQProducerImpl().fetchPublishMessageQueues(THREE_REPLICAS_TOPIC);
+ MessageQueue targetMq = null;
+ for (MessageQueue mq : mqList) {
+ if (mq.getBrokerName().equals(master1With3Replicas.getBrokerConfig().getBrokerName())) {
+ targetMq = mq;
+ }
+ }
+
+ assertThat(targetMq).isNotNull();
+
+ Message msg = new Message(THREE_REPLICAS_TOPIC, MESSAGE_BODY);
+ boolean exceptionCaught = false;
+ try {
+ mqProducer.send(msg, targetMq);
+ } catch (MQBrokerException e) {
+ exceptionCaught = true;
+ }
+
+ assertThat(exceptionCaught).isTrue();
+ // Recover the cluster state
+ createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+ createAndAddSlave(2, brokerContainer3, master1With3Replicas);
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master1With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2
+ && master1With3Replicas.getMessageStore().getAliveReplicaNumInGroup() == 3);
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java
new file mode 100644
index 0000000..3e6cd8d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SlaveBrokerIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SlaveBrokerIT extends ContainerIntegrationTestBase {
+ @Test
+ public void reAddSlaveBroker() throws Exception {
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+ if (clusterInfo.getClusterAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerClusterName()).size() != 3) {
+ return false;
+ }
+
+ if (clusterInfo.getBrokerAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+ return false;
+ }
+
+ if (clusterInfo.getBrokerAddrTable().get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+ return false;
+ }
+
+ if (clusterInfo.getBrokerAddrTable().get(master3With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() != 3) {
+ return false;
+ }
+
+ return true;
+ });
+
+ // Remove one replicas from each broker group
+ removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+ removeSlaveBroker(1, brokerContainer2, master1With3Replicas);
+ removeSlaveBroker(1, brokerContainer3, master2With3Replicas);
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ // Test cluster info again
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ assertThat(clusterInfo.getBrokerAddrTable().get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+ .isEqualTo(2);
+
+ assertThat(clusterInfo.getBrokerAddrTable().get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+ .isEqualTo(2);
+
+ assertThat(clusterInfo.getBrokerAddrTable().get(master3With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size())
+ .isEqualTo(2);
+ return true;
+ });
+
+ // ReAdd the slave broker
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+ createAndAddSlave(1, brokerContainer2, master1With3Replicas);
+ createAndAddSlave(1, brokerContainer3, master2With3Replicas);
+
+ // Trigger a register action
+ //for (final SlaveBrokerController slaveBrokerController : brokerContainer1.getSlaveBrokers()) {
+ // slaveBrokerController.registerBrokerAll(false, false, true);
+ //}
+ //
+ //for (final SlaveBrokerController slaveBrokerController : brokerContainer2.getSlaveBrokers()) {
+ // slaveBrokerController.registerBrokerAll(false, false, true);
+ //}
+
+ await().atMost(Duration.ofMinutes(1)).until(() -> {
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+
+ return clusterInfo.getBrokerAddrTable()
+ .get(master1With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3
+ && clusterInfo.getBrokerAddrTable()
+ .get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3
+ && clusterInfo.getBrokerAddrTable()
+ .get(master2With3Replicas.getBrokerConfig().getBrokerName()).getBrokerAddrs().size() == 3;
+ });
+ }
+
+ @Test
+ public void reAddSlaveBroker_ConnectionCheck() throws Exception {
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+
+ removeSlaveBroker(1, brokerContainer1, master3With3Replicas);
+ createAndAddSlave(1, brokerContainer1, master3With3Replicas);
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().getConnectionCount().get() == 2);
+
+ await().atMost(100, TimeUnit.SECONDS)
+ .until(() -> ((DefaultMessageStore) master3With3Replicas.getMessageStore()).getHaService().inSyncSlaveNums(0) == 2);
+
+ Thread.sleep(1000 * 101);
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java b/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java
new file mode 100644
index 0000000..7b243f5
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/container/SyncConsumerOffsetIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.container;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.container.BrokerContainer;
+import org.apache.rocketmq.container.InnerSalveBrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class SyncConsumerOffsetIT extends ContainerIntegrationTestBase {
+ private static final String THREE_REPLICA_CONSUMER_GROUP = "SyncConsumerOffsetIT_ConsumerThreeReplica";
+ private static final String TEST_SYNC_TOPIC = SyncConsumerOffsetIT.class.getSimpleName() + "_topic";
+
+ private static DefaultMQProducer mqProducer;
+ private static DefaultMQPushConsumer mqConsumerThreeReplica;
+
+ private final byte[] MESSAGE_BODY = ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET);
+
+ public SyncConsumerOffsetIT() throws UnsupportedEncodingException {
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ createTopicTo(master3With3Replicas, TEST_SYNC_TOPIC);
+
+ mqProducer = createProducer("SyncConsumerOffsetIT_Producer");
+ mqProducer.setSendMsgTimeout(15 * 1000);
+ mqProducer.start();
+
+ mqConsumerThreeReplica = createPushConsumer(THREE_REPLICA_CONSUMER_GROUP);
+ mqConsumerThreeReplica.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ mqConsumerThreeReplica.subscribe(TEST_SYNC_TOPIC, "*");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ mqProducer.shutdown();
+ mqConsumerThreeReplica.shutdown();
+ }
+
+ @Test
+ public void syncConsumerOffsetWith3Replicas() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ syncConsumeOffsetInner(TEST_SYNC_TOPIC, mqConsumerThreeReplica,
+ master3With3Replicas, Arrays.asList(brokerContainer1, brokerContainer2));
+ }
+
+ private void syncConsumeOffsetInner(String topic, DefaultMQPushConsumer consumer, BrokerController master,
+ List<BrokerContainer> slaveContainers) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ awaitUntilSlaveOK();
+ String group = THREE_REPLICA_CONSUMER_GROUP;
+
+ int msgCount = 100;
+ for (int i = 0; i < msgCount; i++) {
+ Message msg = new Message(topic, MESSAGE_BODY);
+ SendResult sendResult = mqProducer.send(msg);
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ }
+
+ CountDownLatch countDownLatch = new CountDownLatch(msgCount);
+ consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
+ countDownLatch.countDown();
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ consumer.start();
+ boolean ok = countDownLatch.await(100, TimeUnit.SECONDS);
+ assertThat(ok).isEqualTo(true);
+ System.out.printf("consume complete%n");
+
+ final Set<MessageQueue> mqSet = filterMessageQueue(consumer.fetchSubscribeMessageQueues(topic), topic);
+
+ await().atMost(120, TimeUnit.SECONDS).until(() -> {
+ Map<Integer, Long> consumerOffsetMap = new HashMap<>();
+ long offsetTotal = 0L;
+ for (MessageQueue mq : mqSet) {
+ long queueOffset = master.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
+ if (queueOffset < 0) {
+ continue;
+ }
+ offsetTotal += queueOffset;
+ consumerOffsetMap.put(mq.getQueueId(), queueOffset);
+ }
+
+ if (offsetTotal < 100) {
+ return false;
+ }
+ boolean syncOk = true;
+
+ for (BrokerContainer brokerContainer : slaveContainers) {
+ for (InnerSalveBrokerController slave : brokerContainer.getSlaveBrokers()) {
+ if (!slave.getBrokerConfig().getBrokerName().equals(master.getBrokerConfig().getBrokerName())) {
+ continue;
+ }
+ for (MessageQueue mq : mqSet) {
+ long slaveOffset = slave.getConsumerOffsetManager().queryOffset(group, topic, mq.getQueueId());
+ boolean check = slaveOffset == consumerOffsetMap.get(mq.getQueueId());
+ syncOk &= check;
+ }
+ }
+ }
+
+ return syncOk;
+ });
+ }
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index b3d2d47..a6e3e75 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -19,11 +19,7 @@ package org.apache.rocketmq.test.statictopic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
-import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -36,22 +32,20 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -210,8 +204,8 @@ public class StaticTopicIT extends BaseConf {
private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
-// System.out.println("produce:" + producer.getAllMsgBody().size());
-// System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());
+ /*System.out.println("produce:" + producer.getAllMsgBody().size());
+ System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
@@ -284,7 +278,6 @@ public class StaticTopicIT extends BaseConf {
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
-
//remapping the static topic
{
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
@@ -344,6 +337,8 @@ public class StaticTopicIT extends BaseConf {
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
+
+ TestUtils.waitForSeconds(20);
consumeStats = defaultMQAdminExt.examineConsumeStats(group);
messageQueues = producer.getMessageQueue();