You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/22 01:34:37 UTC
[rocketmq] branch develop updated: [ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ee37e3a55 [ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)
ee37e3a55 is described below
commit ee37e3a55fe21e0720cb10291bea65392626c775
Author: rongtong <ji...@163.com>
AuthorDate: Tue Nov 22 09:34:30 2022 +0800
[ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)
* Make AutoSwitchRoleIntegrationTest more stable
* Pass the checkstyle
* Pass the checkstyle
* Fix compatibility issues
* Simple optimization
* Remove useless import
* Pass the check style
* Pass the check style
* Modify DEFAULT_FILE_SIZE
* Modify DEFAULT_FILE_SIZE
* test
* Remove useless import
* Add more output to debug
* Use the new directory every time when start a broker
* Use the new directory every time when start a broker
* Remove console output
* Remove console output
---
.../store/ha/autoswitch/AutoSwitchHATest.java | 25 ++-
.../test/autoswitchrole/AutoSwitchRoleBase.java | 76 +++++----
.../AutoSwitchRoleIntegrationTest.java | 174 ++++++++++-----------
3 files changed, 126 insertions(+), 149 deletions(-)
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index d74f1f3f2..92e9b625b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,12 +21,12 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
@@ -186,20 +186,13 @@ public class AutoSwitchHATest {
return flag;
}
- private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
- for (int i = 0; i < totalMsgs; i++) {
- final int index = i;
- Boolean exist = await().atMost(Duration.ofSeconds(20)).until(() -> {
- GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset + index, 1024 * 1024, null);
- if (result == null) {
- return false;
- }
- boolean equals = GetMessageStatus.FOUND.equals(result.getStatus());
- result.release();
- return equals;
- }, item -> item);
- assertTrue(exist);
- }
+ private void checkMessage(final DefaultMessageStore messageStore, int totalNums, int startOffset) {
+ await().atMost(30, TimeUnit.SECONDS)
+ .until(() -> {
+ GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null);
+// System.out.printf(result + "%n");
+ return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() >= totalNums;
+ });
}
@Test
@@ -324,7 +317,7 @@ public class AutoSwitchHATest {
// Step2: add new broker3, link to broker1
messageStore3.getHaService().changeToSlave("", 1, 3L);
- messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912");
+ messageStore3.getHaService().updateHaMasterAddress(store1HaAddress);
checkMessage(messageStore3, 10, 0);
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 222226cc3..6e230bbe1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -28,10 +28,8 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.ControllerConfig;
@@ -40,30 +38,30 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class AutoSwitchRoleBase {
- private final String storePathRootParentDir = System.getProperty("user.home") + File.separator +
+ protected static final String STORE_PATH_ROOT_PARENT_DIR = System.getProperty("user.home") + File.separator +
UUID.randomUUID().toString().replace("-", "");
- private static final AtomicInteger PORT_COUNTER = new AtomicInteger(35000);
- private final String storePathRootDir = storePathRootParentDir + File.separator + "store";
+ private static final String STORE_PATH_ROOT_DIR = STORE_PATH_ROOT_PARENT_DIR + File.separator + "store";
private static final String STORE_MESSAGE = "Once, there was a chance for me!";
private static final byte[] MESSAGE_BODY = STORE_MESSAGE.getBytes();
- private final AtomicInteger queueId = new AtomicInteger(0);
- protected List<BrokerController> brokerList;
- private SocketAddress bornHost;
- private SocketAddress storeHost;
- private static Integer no = 0;
-
- protected void initialize() {
- this.brokerList = new ArrayList<>();
+ protected static List<BrokerController> brokerList;
+ private static SocketAddress bornHost;
+ private static SocketAddress storeHost;
+ private static Integer number = 0;
+
+ protected static void initialize() {
+ brokerList = new ArrayList<>();
try {
storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
@@ -88,20 +86,21 @@ public class AutoSwitchRoleBase {
break;
}
} catch (Exception ignored) {
- if (no > 200) {
+ if (number > 200) {
throw new IOException("This server's open ports are temporarily full!");
}
- no++;
+ number++;
port = nextPort(minPort, maxPort);
}
- no = 0;
+ number = 0;
return port;
}
- public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort,
+ public BrokerController startBroker(String namesrvAddress, String controllerAddress, String brokerName,
+ int brokerId, int haPort,
int brokerListenPort,
int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
- final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
+ final MessageStoreConfig storeConfig = buildMessageStoreConfig(brokerName + "#" + brokerId, haPort, mappedFileSize);
storeConfig.setHaMaxTimeSlaveNotCatchup(3 * 1000);
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setListenPort(brokerListenPort);
@@ -109,6 +108,7 @@ public class AutoSwitchRoleBase {
brokerConfig.setControllerAddr(controllerAddress);
brokerConfig.setSyncBrokerMetadataPeriod(2 * 1000);
brokerConfig.setCheckSyncStateSetPeriod(2 * 1000);
+ brokerConfig.setBrokerName(brokerName);
brokerConfig.setEnableControllerMode(true);
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
@@ -128,15 +128,15 @@ public class AutoSwitchRoleBase {
return brokerController;
}
- protected MessageStoreConfig buildMessageStoreConfig(final String brokerName, final int haPort,
+ protected MessageStoreConfig buildMessageStoreConfig(final String brokerDir, final int haPort,
final int mappedFileSize) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setHaSendHeartbeatInterval(1000);
storeConfig.setBrokerRole(BrokerRole.SLAVE);
storeConfig.setHaListenPort(haPort);
- storeConfig.setStorePathRootDir(storePathRootDir + File.separator + brokerName);
- storeConfig.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + File.separator + "commitlog");
- storeConfig.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + File.separator + "EpochFileCache");
+ storeConfig.setStorePathRootDir(STORE_PATH_ROOT_DIR + File.separator + brokerDir);
+ storeConfig.setStorePathCommitLog(STORE_PATH_ROOT_DIR + File.separator + brokerDir + File.separator + "commitlog");
+ storeConfig.setStorePathEpochFile(STORE_PATH_ROOT_DIR + File.separator + brokerDir + File.separator + "EpochFileCache");
storeConfig.setTotalReplicas(3);
storeConfig.setInSyncReplicas(2);
@@ -149,24 +149,23 @@ public class AutoSwitchRoleBase {
return storeConfig;
}
- protected ControllerConfig buildControllerConfig(final String id, final String peers) {
+ protected static ControllerConfig buildControllerConfig(final String id, final String peers) {
final ControllerConfig config = new ControllerConfig();
config.setControllerDLegerGroup("group1");
config.setControllerDLegerPeers(peers);
config.setControllerDLegerSelfId(id);
config.setMappedFileSize(1024 * 1024);
- config.setControllerStorePath(storePathRootDir + File.separator + "namesrv" + id + File.separator + "DLedgerController");
+ config.setControllerStorePath(STORE_PATH_ROOT_DIR + File.separator + "namesrv" + id + File.separator + "DLedgerController");
return config;
}
- protected MessageExtBrokerInner buildMessage() {
+ protected MessageExtBrokerInner buildMessage(String topic) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
- msg.setTopic("FooBar");
+ msg.setTopic(topic);
msg.setTags("TAG1");
msg.setBody(MESSAGE_BODY);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
- int queueTotal = 1;
- msg.setQueueId(Math.abs(queueId.getAndIncrement()) % queueTotal);
+ msg.setQueueId(0);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
@@ -175,25 +174,22 @@ public class AutoSwitchRoleBase {
return msg;
}
- protected void putMessage(MessageStore messageStore) throws InterruptedException {
+ protected void putMessage(MessageStore messageStore, String topic) {
// Put message on master
for (int i = 0; i < 10; i++) {
- messageStore.putMessage(buildMessage());
+ assertSame(messageStore.putMessage(buildMessage(topic)).getPutMessageStatus(), PutMessageStatus.PUT_OK);
}
- Thread.sleep(1000);
}
- protected void checkMessage(final MessageStore messageStore, int totalMsgs, int startOffset) {
- await().atMost(60, TimeUnit.SECONDS)
+ protected void checkMessage(final MessageStore messageStore, String topic, int totalNums, int startOffset) {
+ await().atMost(30, TimeUnit.SECONDS)
.until(() -> {
- GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null);
- return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() == totalMsgs;
+ GetMessageResult result = messageStore.getMessage("GROUP_A", topic, 0, startOffset, 1024, null);
+// System.out.printf(result + "%n");
+// System.out.printf("maxPhyOffset=" + messageStore.getMaxPhyOffset() + "%n");
+// System.out.printf("confirmOffset=" + messageStore.getConfirmOffset() + "%n");
+ return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() >= totalNums;
});
}
- protected void destroy() {
- File file = new File(storePathRootParentDir);
- UtilAll.deleteFile(file);
- }
-
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 58949b084..d145fc516 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -17,23 +17,22 @@
package org.apache.rocketmq.test.autoswitchrole;
-import com.google.common.collect.ImmutableList;
import java.io.File;
-import java.time.Duration;
-import java.util.List;
+import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.controller.ReplicasManager;
-import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -41,30 +40,29 @@ import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
- private final int defaultFileSize = 1024 * 1024;
- private ControllerConfig controllerConfig;
- private NamesrvController namesrvController;
- private ControllerManager controllerManager;
- private String namesrvAddress;
- private String controllerAddress;
+ private static final int DEFAULT_FILE_SIZE = 1024 * 1024;
+ private static NamesrvController namesrvController;
+ private static ControllerManager controllerManager;
+ private static String nameserverAddress;
+ private static String controllerAddress;
private BrokerController brokerController1;
private BrokerController brokerController2;
- protected List<BrokerController> brokerControllerList;
+ private Random random = new Random();
+ @BeforeClass
+ public static void init() throws Exception {
+ initialize();
- public void init(int mappedFileSize) throws Exception {
- super.initialize();
-
- // Startup namesrv
int controllerPort = nextPort();
final String peers = String.format("n0-localhost:%d", controllerPort);
@@ -72,33 +70,33 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
int namesrvPort = nextPort();
serverConfig.setListenPort(namesrvPort);
- this.controllerConfig = buildControllerConfig("n0", peers);
- this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig());
+ ControllerConfig controllerConfig = buildControllerConfig("n0", peers);
+ namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig());
assertTrue(namesrvController.initialize());
namesrvController.start();
- this.controllerManager = new ControllerManager(controllerConfig, new NettyServerConfig(), new NettyClientConfig());
+ controllerManager = new ControllerManager(controllerConfig, new NettyServerConfig(), new NettyClientConfig());
assertTrue(controllerManager.initialize());
controllerManager.start();
- this.namesrvAddress = "127.0.0.1:" + namesrvPort + ";";
- this.controllerAddress = "127.0.0.1:" + controllerPort + ";";
-
- this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
- this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
- this.brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
+ nameserverAddress = "127.0.0.1:" + namesrvPort + ";";
+ controllerAddress = "127.0.0.1:" + controllerPort + ";";
+ }
+ public void initBroker(int mappedFileSize, String brokerName) throws Exception {
+ this.brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
+ this.brokerController2 = startBroker(nameserverAddress, controllerAddress, brokerName, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
// Wait slave connecting to master
assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
+ Thread.sleep(1000);
}
- public void mockData() throws Exception {
+ public void mockData(String topic) throws Exception {
final MessageStore messageStore = brokerController1.getMessageStore();
- putMessage(messageStore);
- Thread.sleep(3000);
+ putMessage(messageStore, topic);
// Check slave message
- checkMessage(brokerController2.getMessageStore(), 10, 0);
+ checkMessage(brokerController2.getMessageStore(), topic, 10, 0);
}
public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedException {
@@ -117,16 +115,17 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
@Test
public void testCheckSyncStateSet() throws Exception {
- init(defaultFileSize);
- awaitDispatchMs(6);
- mockData();
+ String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ initBroker(DEFAULT_FILE_SIZE, brokerName);
+
+ mockData(topic);
// Check sync state set
final ReplicasManager replicasManager = brokerController1.getReplicasManager();
SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
assertEquals(2, syncStateSet.getSyncStateSet().size());
-
// Shutdown controller2
ScheduledExecutorService singleThread = Executors.newSingleThreadScheduledExecutor();
while (!singleThread.awaitTermination(6 * 1000, TimeUnit.MILLISECONDS)) {
@@ -135,18 +134,20 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
}
syncStateSet = replicasManager.getSyncStateSet();
- shutdown();
+ shutdownAndClearBroker();
assertEquals(1, syncStateSet.getSyncStateSet().size());
}
@Test
public void testChangeMaster() throws Exception {
- init(defaultFileSize);
- mockData();
+ String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ initBroker(DEFAULT_FILE_SIZE, brokerName);
+ mockData(topic);
// Let master shutdown
brokerController1.shutdown();
- this.brokerList.remove(this.brokerController1);
+ brokerList.remove(this.brokerController1);
Thread.sleep(6000);
// The slave should change to master
@@ -154,7 +155,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
// Restart old master, it should be slave
- brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+ brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
waitSlaveReady(brokerController1.getMessageStore());
assertFalse(brokerController1.getReplicasManager().isMasterState());
@@ -162,60 +163,58 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
// Put another batch messages
final MessageStore messageStore = brokerController2.getMessageStore();
- putMessage(messageStore);
+ putMessage(messageStore, topic);
- Thread.sleep(3000);
-
- // Check slave message
- checkMessage(brokerController1.getMessageStore(), 20, 0);
- shutdown();
+ //Check slave message
+ checkMessage(brokerController1.getMessageStore(), topic, 20, 0);
+ shutdownAndClearBroker();
}
@Test
public void testAddBroker() throws Exception {
- init(defaultFileSize);
- mockData();
+ String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+ initBroker(DEFAULT_FILE_SIZE, brokerName);
+ mockData(topic);
- BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+ BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
waitSlaveReady(broker3.getMessageStore());
- Thread.sleep(3000);
- checkMessage(broker3.getMessageStore(), 10, 0);
+ checkMessage(broker3.getMessageStore(), topic, 10, 0);
- putMessage(this.brokerController1.getMessageStore());
- Thread.sleep(3000);
- checkMessage(broker3.getMessageStore(), 20, 0);
- shutdown();
+ putMessage(this.brokerController1.getMessageStore(), topic);
+ checkMessage(broker3.getMessageStore(), topic, 20, 0);
+ shutdownAndClearBroker();
}
@Test
public void testTruncateEpochLogAndChangeMaster() throws Exception {
+ shutdownAndClearBroker();
+ String topic = "FooBar";
+ String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
// Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
- init(1700);
+ initBroker(1700, brokerName);
// Step1: Put message
- putMessage(this.brokerController1.getMessageStore());
- Thread.sleep(3000);
- checkMessage(this.brokerController2.getMessageStore(), 10, 0);
+ putMessage(this.brokerController1.getMessageStore(), topic);
+ checkMessage(this.brokerController2.getMessageStore(), topic, 10, 0);
// Step2: shutdown broker1, broker2 as master
brokerController1.shutdown();
- this.brokerList.remove(brokerController1);
+ brokerList.remove(brokerController1);
Thread.sleep(5000);
assertTrue(brokerController2.getReplicasManager().isMasterState());
assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
// Step3: add broker3
- BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+ BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker3.getMessageStore());
- Thread.sleep(6000);
- checkMessage(broker3.getMessageStore(), 10, 0);
+ checkMessage(broker3.getMessageStore(), topic, 10, 0);
// Step4: put another batch message
// Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
- putMessage(this.brokerController2.getMessageStore());
- Thread.sleep(2000);
- checkMessage(broker3.getMessageStore(), 20, 0);
+ putMessage(this.brokerController2.getMessageStore(), topic);
+ checkMessage(broker3.getMessageStore(), topic, 20, 0);
// Step5: Check file position, each epoch will be stored on one file(Because fileSize = 1700, which equal to 10 msg size);
// So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
@@ -231,44 +230,33 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
final AutoSwitchHAService haService = (AutoSwitchHAService) this.brokerController2.getMessageStore().getHaService();
haService.truncateEpochFilePrefix(1570);
- checkMessage(broker2MessageStore, 10, 10);
+ checkMessage(broker2MessageStore, topic, 10, 10);
// Step6, start broker4, link to broker2, it should sync msg from epoch2(offset = 1700).
- BrokerController broker4 = startBroker(this.namesrvAddress, this.controllerAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+ BrokerController broker4 = startBroker(nameserverAddress, controllerAddress, brokerName, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
waitSlaveReady(broker4.getMessageStore());
- Thread.sleep(6000);
- checkMessage(broker4.getMessageStore(), 10, 10);
- shutdown();
+ checkMessage(broker4.getMessageStore(), topic, 10, 10);
+ shutdownAndClearBroker();
}
- public void shutdown() throws InterruptedException {
- for (BrokerController controller : this.brokerList) {
+ public void shutdownAndClearBroker() throws InterruptedException {
+ for (BrokerController controller : brokerList) {
controller.shutdown();
UtilAll.deleteFile(new File(controller.getMessageStoreConfig().getStorePathRootDir()));
}
- if (this.namesrvController != null) {
- this.namesrvController.shutdown();
- }
- super.destroy();
+ brokerList.clear();
}
- public boolean awaitDispatchMs(long timeMs) throws Exception {
- await().atMost(Duration.ofSeconds(timeMs)).until(
- () -> {
- boolean allOk = true;
- for (BrokerController brokerController: brokerControllerList) {
- if (brokerController.getMessageStore() == null) {
- allOk = false;
- break;
- }
- }
- if (allOk) {
- return true;
- }
- return false;
- }
- );
- return false;
+ @AfterClass
+ public static void destroy() {
+ if (namesrvController != null) {
+ namesrvController.shutdown();
+ }
+ if (controllerManager != null) {
+ controllerManager.shutdown();
+ }
+ File file = new File(STORE_PATH_ROOT_PARENT_DIR);
+ UtilAll.deleteFile(file);
}
}