You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/11/22 01:34:37 UTC

[rocketmq] branch develop updated: [ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ee37e3a55 [ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)
ee37e3a55 is described below

commit ee37e3a55fe21e0720cb10291bea65392626c775
Author: rongtong <ji...@163.com>
AuthorDate: Tue Nov 22 09:34:30 2022 +0800

    [ISSUE #5471] Fix AutoSwitchRoleIntegrationTest still experience random failures (#5475)
    
    * Make AutoSwitchRoleIntegrationTest more stable
    
    * Pass the checkstyle
    
    * Pass the checkstyle
    
    * Fix compatibility issues
    
    * Simple optimization
    
    * Remove useless import
    
    * Pass the check style
    
    * Pass the check style
    
    * Modify DEFAULT_FILE_SIZE
    
    * Modify DEFAULT_FILE_SIZE
    
    * test
    
    * Remove useless import
    
    * Add more output to debug
    
    * Use the new directory every time when start a broker
    
    * Use the new directory every time when start a broker
    
    * Remove console output
    
    * Remove console output
---
 .../store/ha/autoswitch/AutoSwitchHATest.java      |  25 ++-
 .../test/autoswitchrole/AutoSwitchRoleBase.java    |  76 +++++----
 .../AutoSwitchRoleIntegrationTest.java             | 174 ++++++++++-----------
 3 files changed, 126 insertions(+), 149 deletions(-)

diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index d74f1f3f2..92e9b625b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -21,12 +21,12 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
@@ -186,20 +186,13 @@ public class AutoSwitchHATest {
         return flag;
     }
 
-    private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
-        for (int i = 0; i < totalMsgs; i++) {
-            final int index = i;
-            Boolean exist = await().atMost(Duration.ofSeconds(20)).until(() -> {
-                GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset + index, 1024 * 1024, null);
-                if (result == null) {
-                    return false;
-                }
-                boolean equals = GetMessageStatus.FOUND.equals(result.getStatus());
-                result.release();
-                return equals;
-            }, item -> item);
-            assertTrue(exist);
-        }
+    private void checkMessage(final DefaultMessageStore messageStore, int totalNums, int startOffset) {
+        await().atMost(30, TimeUnit.SECONDS)
+            .until(() -> {
+                GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null);
+//                System.out.printf(result + "%n");
+                return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() >= totalNums;
+            });
     }
 
     @Test
@@ -324,7 +317,7 @@ public class AutoSwitchHATest {
 
         // Step2: add new broker3, link to broker1
         messageStore3.getHaService().changeToSlave("", 1, 3L);
-        messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912");
+        messageStore3.getHaService().updateHaMasterAddress(store1HaAddress);
         checkMessage(messageStore3, 10, 0);
     }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index 222226cc3..6e230bbe1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -28,10 +28,8 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.ControllerConfig;
@@ -40,30 +38,30 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class AutoSwitchRoleBase {
 
-    private final String storePathRootParentDir = System.getProperty("user.home") + File.separator +
+    protected static final String STORE_PATH_ROOT_PARENT_DIR = System.getProperty("user.home") + File.separator +
         UUID.randomUUID().toString().replace("-", "");
-    private static final AtomicInteger PORT_COUNTER = new AtomicInteger(35000);
-    private final String storePathRootDir = storePathRootParentDir + File.separator + "store";
+    private static final String STORE_PATH_ROOT_DIR = STORE_PATH_ROOT_PARENT_DIR + File.separator + "store";
     private static final String STORE_MESSAGE = "Once, there was a chance for me!";
     private static final byte[] MESSAGE_BODY = STORE_MESSAGE.getBytes();
-    private final AtomicInteger queueId = new AtomicInteger(0);
-    protected List<BrokerController> brokerList;
-    private SocketAddress bornHost;
-    private SocketAddress storeHost;
-    private static Integer no = 0;
-
-    protected void initialize() {
-        this.brokerList = new ArrayList<>();
+    protected static List<BrokerController> brokerList;
+    private static SocketAddress bornHost;
+    private static SocketAddress storeHost;
+    private static Integer number = 0;
+
+    protected static void initialize() {
+        brokerList = new ArrayList<>();
         try {
             storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
             bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
@@ -88,20 +86,21 @@ public class AutoSwitchRoleBase {
                 break;
             }
         } catch (Exception ignored) {
-            if (no > 200) {
+            if (number > 200) {
                 throw new IOException("This server's open ports are temporarily full!");
             }
-            no++;
+            number++;
             port = nextPort(minPort, maxPort);
         }
-        no = 0;
+        number = 0;
         return port;
     }
 
-    public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort,
+    public BrokerController startBroker(String namesrvAddress, String controllerAddress, String brokerName,
+        int brokerId, int haPort,
         int brokerListenPort,
         int nettyListenPort, BrokerRole expectedRole, int mappedFileSize) throws Exception {
-        final MessageStoreConfig storeConfig = buildMessageStoreConfig("broker" + brokerId, haPort, mappedFileSize);
+        final MessageStoreConfig storeConfig = buildMessageStoreConfig(brokerName + "#" + brokerId, haPort, mappedFileSize);
         storeConfig.setHaMaxTimeSlaveNotCatchup(3 * 1000);
         final BrokerConfig brokerConfig = new BrokerConfig();
         brokerConfig.setListenPort(brokerListenPort);
@@ -109,6 +108,7 @@ public class AutoSwitchRoleBase {
         brokerConfig.setControllerAddr(controllerAddress);
         brokerConfig.setSyncBrokerMetadataPeriod(2 * 1000);
         brokerConfig.setCheckSyncStateSetPeriod(2 * 1000);
+        brokerConfig.setBrokerName(brokerName);
         brokerConfig.setEnableControllerMode(true);
 
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();
@@ -128,15 +128,15 @@ public class AutoSwitchRoleBase {
         return brokerController;
     }
 
-    protected MessageStoreConfig buildMessageStoreConfig(final String brokerName, final int haPort,
+    protected MessageStoreConfig buildMessageStoreConfig(final String brokerDir, final int haPort,
         final int mappedFileSize) {
         MessageStoreConfig storeConfig = new MessageStoreConfig();
         storeConfig.setHaSendHeartbeatInterval(1000);
         storeConfig.setBrokerRole(BrokerRole.SLAVE);
         storeConfig.setHaListenPort(haPort);
-        storeConfig.setStorePathRootDir(storePathRootDir + File.separator + brokerName);
-        storeConfig.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + File.separator + "commitlog");
-        storeConfig.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + File.separator + "EpochFileCache");
+        storeConfig.setStorePathRootDir(STORE_PATH_ROOT_DIR + File.separator + brokerDir);
+        storeConfig.setStorePathCommitLog(STORE_PATH_ROOT_DIR + File.separator + brokerDir + File.separator + "commitlog");
+        storeConfig.setStorePathEpochFile(STORE_PATH_ROOT_DIR + File.separator + brokerDir + File.separator + "EpochFileCache");
         storeConfig.setTotalReplicas(3);
         storeConfig.setInSyncReplicas(2);
 
@@ -149,24 +149,23 @@ public class AutoSwitchRoleBase {
         return storeConfig;
     }
 
-    protected ControllerConfig buildControllerConfig(final String id, final String peers) {
+    protected static ControllerConfig buildControllerConfig(final String id, final String peers) {
         final ControllerConfig config = new ControllerConfig();
         config.setControllerDLegerGroup("group1");
         config.setControllerDLegerPeers(peers);
         config.setControllerDLegerSelfId(id);
         config.setMappedFileSize(1024 * 1024);
-        config.setControllerStorePath(storePathRootDir + File.separator + "namesrv" + id + File.separator + "DLedgerController");
+        config.setControllerStorePath(STORE_PATH_ROOT_DIR + File.separator + "namesrv" + id + File.separator + "DLedgerController");
         return config;
     }
 
-    protected MessageExtBrokerInner buildMessage() {
+    protected MessageExtBrokerInner buildMessage(String topic) {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
-        msg.setTopic("FooBar");
+        msg.setTopic(topic);
         msg.setTags("TAG1");
         msg.setBody(MESSAGE_BODY);
         msg.setKeys(String.valueOf(System.currentTimeMillis()));
-        int queueTotal = 1;
-        msg.setQueueId(Math.abs(queueId.getAndIncrement()) % queueTotal);
+        msg.setQueueId(0);
         msg.setSysFlag(0);
         msg.setBornTimestamp(System.currentTimeMillis());
         msg.setStoreHost(storeHost);
@@ -175,25 +174,22 @@ public class AutoSwitchRoleBase {
         return msg;
     }
 
-    protected void putMessage(MessageStore messageStore) throws InterruptedException {
+    protected void putMessage(MessageStore messageStore, String topic) {
         // Put message on master
         for (int i = 0; i < 10; i++) {
-            messageStore.putMessage(buildMessage());
+            assertSame(messageStore.putMessage(buildMessage(topic)).getPutMessageStatus(), PutMessageStatus.PUT_OK);
         }
-        Thread.sleep(1000);
     }
 
-    protected void checkMessage(final MessageStore messageStore, int totalMsgs, int startOffset) {
-        await().atMost(60, TimeUnit.SECONDS)
+    protected void checkMessage(final MessageStore messageStore, String topic, int totalNums, int startOffset) {
+        await().atMost(30, TimeUnit.SECONDS)
             .until(() -> {
-                GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset, 1024, null);
-                return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() == totalMsgs;
+                GetMessageResult result = messageStore.getMessage("GROUP_A", topic, 0, startOffset, 1024, null);
+//                System.out.printf(result + "%n");
+//                System.out.printf("maxPhyOffset=" + messageStore.getMaxPhyOffset() + "%n");
+//                System.out.printf("confirmOffset=" + messageStore.getConfirmOffset() + "%n");
+                return result != null && result.getStatus() == GetMessageStatus.FOUND && result.getMessageCount() >= totalNums;
             });
     }
 
-    protected void destroy() {
-        File file = new File(storePathRootParentDir);
-        UtilAll.deleteFile(file);
-    }
-
 }
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 58949b084..d145fc516 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -17,23 +17,22 @@
 
 package org.apache.rocketmq.test.autoswitchrole;
 
-import com.google.common.collect.ImmutableList;
 import java.io.File;
-import java.time.Duration;
-import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
-import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.controller.ControllerManager;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.store.MappedFileQueue;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -41,30 +40,29 @@ import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnectionState;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
-    private final int defaultFileSize = 1024 * 1024;
-    private ControllerConfig controllerConfig;
-    private NamesrvController namesrvController;
-    private ControllerManager controllerManager;
-    private String namesrvAddress;
-    private String controllerAddress;
+    private static final int DEFAULT_FILE_SIZE = 1024 * 1024;
+    private static NamesrvController namesrvController;
+    private static ControllerManager controllerManager;
+    private static String nameserverAddress;
+    private static String controllerAddress;
     private BrokerController brokerController1;
     private BrokerController brokerController2;
-    protected List<BrokerController> brokerControllerList;
+    private Random random = new Random();
 
+    @BeforeClass
+    public static void init() throws Exception {
+        initialize();
 
-    public void init(int mappedFileSize) throws Exception {
-        super.initialize();
-
-        // Startup namesrv
         int controllerPort = nextPort();
         final String peers = String.format("n0-localhost:%d", controllerPort);
 
@@ -72,33 +70,33 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         int namesrvPort = nextPort();
         serverConfig.setListenPort(namesrvPort);
 
-        this.controllerConfig = buildControllerConfig("n0", peers);
-        this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig());
+        ControllerConfig controllerConfig = buildControllerConfig("n0", peers);
+        namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig());
         assertTrue(namesrvController.initialize());
         namesrvController.start();
 
-        this.controllerManager = new ControllerManager(controllerConfig, new NettyServerConfig(), new NettyClientConfig());
+        controllerManager = new ControllerManager(controllerConfig, new NettyServerConfig(), new NettyClientConfig());
         assertTrue(controllerManager.initialize());
         controllerManager.start();
 
-        this.namesrvAddress = "127.0.0.1:" + namesrvPort + ";";
-        this.controllerAddress = "127.0.0.1:" + controllerPort + ";";
-
-        this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
-        this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
-        this.brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
+        nameserverAddress = "127.0.0.1:" + namesrvPort + ";";
+        controllerAddress = "127.0.0.1:" + controllerPort + ";";
+    }
 
+    public void initBroker(int mappedFileSize, String brokerName) throws Exception {
 
+        this.brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
+        this.brokerController2 = startBroker(nameserverAddress, controllerAddress, brokerName, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
         // Wait slave connecting to master
         assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
+        Thread.sleep(1000);
     }
 
-    public void mockData() throws Exception {
+    public void mockData(String topic) throws Exception {
         final MessageStore messageStore = brokerController1.getMessageStore();
-        putMessage(messageStore);
-        Thread.sleep(3000);
+        putMessage(messageStore, topic);
         // Check slave message
-        checkMessage(brokerController2.getMessageStore(), 10, 0);
+        checkMessage(brokerController2.getMessageStore(), topic, 10, 0);
     }
 
     public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedException {
@@ -117,16 +115,17 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
     @Test
     public void testCheckSyncStateSet() throws Exception {
-        init(defaultFileSize);
-        awaitDispatchMs(6);
-        mockData();
+        String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        initBroker(DEFAULT_FILE_SIZE, brokerName);
+
+        mockData(topic);
 
         // Check sync state set
         final ReplicasManager replicasManager = brokerController1.getReplicasManager();
         SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
         assertEquals(2, syncStateSet.getSyncStateSet().size());
 
-
         // Shutdown controller2
         ScheduledExecutorService singleThread = Executors.newSingleThreadScheduledExecutor();
         while (!singleThread.awaitTermination(6 * 1000, TimeUnit.MILLISECONDS)) {
@@ -135,18 +134,20 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         }
 
         syncStateSet = replicasManager.getSyncStateSet();
-        shutdown();
+        shutdownAndClearBroker();
         assertEquals(1, syncStateSet.getSyncStateSet().size());
     }
 
     @Test
     public void testChangeMaster() throws Exception {
-        init(defaultFileSize);
-        mockData();
+        String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        initBroker(DEFAULT_FILE_SIZE, brokerName);
+        mockData(topic);
 
         // Let master shutdown
         brokerController1.shutdown();
-        this.brokerList.remove(this.brokerController1);
+        brokerList.remove(this.brokerController1);
         Thread.sleep(6000);
 
         // The slave should change to master
@@ -154,7 +155,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
 
         // Restart old master, it should be slave
-        brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+        brokerController1 = startBroker(nameserverAddress, controllerAddress, brokerName, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
         waitSlaveReady(brokerController1.getMessageStore());
 
         assertFalse(brokerController1.getReplicasManager().isMasterState());
@@ -162,60 +163,58 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
         // Put another batch messages
         final MessageStore messageStore = brokerController2.getMessageStore();
-        putMessage(messageStore);
+        putMessage(messageStore, topic);
 
-        Thread.sleep(3000);
-
-        // Check slave message
-        checkMessage(brokerController1.getMessageStore(), 20, 0);
-        shutdown();
+        //Check slave message
+        checkMessage(brokerController1.getMessageStore(), topic, 20, 0);
+        shutdownAndClearBroker();
     }
 
     @Test
     public void testAddBroker() throws Exception {
-        init(defaultFileSize);
-        mockData();
+        String topic = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
+        initBroker(DEFAULT_FILE_SIZE, brokerName);
+        mockData(topic);
 
-        BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, defaultFileSize);
+        BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
         waitSlaveReady(broker3.getMessageStore());
-        Thread.sleep(3000);
 
-        checkMessage(broker3.getMessageStore(), 10, 0);
+        checkMessage(broker3.getMessageStore(), topic, 10, 0);
 
-        putMessage(this.brokerController1.getMessageStore());
-        Thread.sleep(3000);
-        checkMessage(broker3.getMessageStore(), 20, 0);
-        shutdown();
+        putMessage(this.brokerController1.getMessageStore(), topic);
+        checkMessage(broker3.getMessageStore(), topic, 20, 0);
+        shutdownAndClearBroker();
     }
 
     @Test
     public void testTruncateEpochLogAndChangeMaster() throws Exception {
+        shutdownAndClearBroker();
+        String topic = "FooBar";
+        String brokerName = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + random.nextInt(65535);
         // Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
-        init(1700);
+        initBroker(1700, brokerName);
         // Step1: Put message
-        putMessage(this.brokerController1.getMessageStore());
-        Thread.sleep(3000);
-        checkMessage(this.brokerController2.getMessageStore(), 10, 0);
+        putMessage(this.brokerController1.getMessageStore(), topic);
+        checkMessage(this.brokerController2.getMessageStore(), topic, 10, 0);
 
         // Step2: shutdown broker1, broker2 as master
         brokerController1.shutdown();
-        this.brokerList.remove(brokerController1);
+        brokerList.remove(brokerController1);
         Thread.sleep(5000);
 
         assertTrue(brokerController2.getReplicasManager().isMasterState());
         assertEquals(brokerController2.getReplicasManager().getMasterEpoch(), 2);
 
         // Step3: add broker3
-        BrokerController broker3 = startBroker(this.namesrvAddress, this.controllerAddress, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+        BrokerController broker3 = startBroker(nameserverAddress, controllerAddress, brokerName, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
         waitSlaveReady(broker3.getMessageStore());
-        Thread.sleep(6000);
-        checkMessage(broker3.getMessageStore(), 10, 0);
+        checkMessage(broker3.getMessageStore(), topic, 10, 0);
 
         // Step4: put another batch message
         // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
-        putMessage(this.brokerController2.getMessageStore());
-        Thread.sleep(2000);
-        checkMessage(broker3.getMessageStore(), 20, 0);
+        putMessage(this.brokerController2.getMessageStore(), topic);
+        checkMessage(broker3.getMessageStore(), topic, 20, 0);
 
         // Step5: Check file position, each epoch will be stored on one file(Because fileSize = 1700, which equal to 10 msg size);
         // So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
@@ -231,44 +230,33 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
         final AutoSwitchHAService haService = (AutoSwitchHAService) this.brokerController2.getMessageStore().getHaService();
         haService.truncateEpochFilePrefix(1570);
-        checkMessage(broker2MessageStore, 10, 10);
+        checkMessage(broker2MessageStore, topic, 10, 10);
 
         // Step6, start broker4, link to broker2, it should sync msg from epoch2(offset = 1700).
-        BrokerController broker4 = startBroker(this.namesrvAddress, this.controllerAddress, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
+        BrokerController broker4 = startBroker(nameserverAddress, controllerAddress, brokerName, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
         waitSlaveReady(broker4.getMessageStore());
-        Thread.sleep(6000);
-        checkMessage(broker4.getMessageStore(), 10, 10);
-        shutdown();
+        checkMessage(broker4.getMessageStore(), topic, 10, 10);
+        shutdownAndClearBroker();
     }
 
-    public void shutdown() throws InterruptedException {
-        for (BrokerController controller : this.brokerList) {
+    public void shutdownAndClearBroker() throws InterruptedException {
+        for (BrokerController controller : brokerList) {
             controller.shutdown();
             UtilAll.deleteFile(new File(controller.getMessageStoreConfig().getStorePathRootDir()));
         }
-        if (this.namesrvController != null) {
-            this.namesrvController.shutdown();
-        }
-        super.destroy();
+        brokerList.clear();
     }
 
-    public boolean awaitDispatchMs(long timeMs) throws Exception {
-        await().atMost(Duration.ofSeconds(timeMs)).until(
-            () -> {
-                boolean allOk = true;
-                for (BrokerController brokerController: brokerControllerList) {
-                    if (brokerController.getMessageStore() == null) {
-                        allOk = false;
-                        break;
-                    }
-                }
-                if (allOk) {
-                    return true;
-                }
-                return false;
-            }
-        );
-        return false;
+    @AfterClass
+    public static void destroy() {
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+        if (controllerManager != null) {
+            controllerManager.shutdown();
+        }
+        File file = new File(STORE_PATH_ROOT_PARENT_DIR);
+        UtilAll.deleteFile(file);
     }
 
 }