You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/06 11:54:46 UTC
[rocketmq] branch develop updated: [ISSUE#4949] fix Address already in use (#5000)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ee1e542e1 [ISSUE#4949] fix Address already in use (#5000)
ee1e542e1 is described below
commit ee1e542e111d65b44d815baf78da85bb7c99c828
Author: windWheel <18...@qq.com>
AuthorDate: Tue Sep 6 19:54:34 2022 +0800
[ISSUE#4949] fix Address already in use (#5000)
* [flaky-test] fix Address already in use'
* Re-apply for a port when the port is occupied
---
.../test/autoswitchrole/AutoSwitchRoleBase.java | 35 +++++++++++--
.../AutoSwitchRoleIntegrationTest.java | 57 +++++++++++++++++++---
2 files changed, 80 insertions(+), 12 deletions(-)
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index e6757909e..98e3d7732 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -18,8 +18,10 @@
package org.apache.rocketmq.test.autoswitchrole;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -59,7 +61,9 @@ public class AutoSwitchRoleBase {
protected List<BrokerController> brokerList;
private SocketAddress BornHost;
private SocketAddress StoreHost;
-
+ private static Integer No= 0;
+
+
protected void initialize() {
this.brokerList = new ArrayList<>();
try {
@@ -68,9 +72,32 @@ public class AutoSwitchRoleBase {
} catch (Exception ignored) {
}
}
-
- public int nextPort() {
- return PORT_COUNTER.addAndGet(10 + random.nextInt(10));
+
+ public static Integer nextPort() throws IOException {
+ return nextPort(1001,9999);
+ }
+
+ public static Integer nextPort(Integer minPort, Integer maxPort) throws IOException {
+ Random random = new Random();
+ int tempPort;
+ int port;
+ try{
+ while (true){
+ tempPort = random.nextInt(maxPort)%(maxPort-minPort+1) + minPort;
+ ServerSocket serverSocket = new ServerSocket(tempPort);
+ port = serverSocket.getLocalPort();
+ serverSocket.close();
+ break;
+ }
+ }catch (Exception ignored){
+ if (No>200){
+ throw new IOException("This server's open ports are temporarily full!");
+ }
+ No++;
+ port = nextPort(minPort,maxPort);
+ }
+ No = 0;
+ return port;
}
public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index a3ffe2675..dfa1d849a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -18,6 +18,13 @@
package org.apache.rocketmq.test.autoswitchrole;
import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.common.UtilAll;
@@ -35,9 +42,11 @@ import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.test.base.BaseConf;
import org.junit.After;
import org.junit.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -52,7 +61,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
private String controllerAddress;
private BrokerController brokerController1;
private BrokerController brokerController2;
-
+ protected List<BrokerController> brokerControllerList;
+
+
public void init(int mappedFileSize) throws Exception {
super.initialize();
@@ -78,7 +89,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
-
+ this.brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
+
+
// Wait slave connecting to master
assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
}
@@ -110,18 +123,24 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
@Test
public void testCheckSyncStateSet() throws Exception {
init(defaultFileSize);
+ awaitDispatchMs(6);
mockData();
// Check sync state set
final ReplicasManager replicasManager = brokerController1.getReplicasManager();
SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
assertEquals(2, syncStateSet.getSyncStateSet().size());
-
+
+
// Shutdown controller2
- this.brokerController2.shutdown();
-
- Thread.sleep(5000);
+ ScheduledExecutorService singleThread = Executors.newSingleThreadScheduledExecutor();
+ while (!singleThread.awaitTermination(6* 1000, TimeUnit.MILLISECONDS)) {
+ this.brokerController2.shutdown();
+ singleThread.shutdown();
+ }
+
syncStateSet = replicasManager.getSyncStateSet();
+ shutdown();
assertEquals(1, syncStateSet.getSyncStateSet().size());
}
@@ -154,6 +173,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
// Check slave message
checkMessage(brokerController1.getMessageStore(), 20, 0);
+ shutdown();
}
@Test
@@ -170,6 +190,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
putMessage(this.brokerController1.getMessageStore());
Thread.sleep(3000);
checkMessage(broker3.getMessageStore(), 20, 0);
+ shutdown();
}
@Test
@@ -222,9 +243,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
waitSlaveReady(broker4.getMessageStore());
Thread.sleep(6000);
checkMessage(broker4.getMessageStore(), 10, 10);
+ shutdown();
}
-
- @After
+
public void shutdown() throws InterruptedException {
for (BrokerController controller : this.brokerList) {
controller.shutdown();
@@ -236,4 +257,24 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
}
super.destroy();
}
+
+ public boolean awaitDispatchMs(long timeMs) throws Exception {
+ await().atMost(Duration.ofSeconds(timeMs)).until(
+ () -> {
+ boolean allOk = true;
+ for (BrokerController brokerController: brokerControllerList) {
+ if (brokerController.getMessageStore() == null) {
+ allOk = false;
+ break;
+ }
+ }
+ if (allOk) {
+ return true;
+ }
+ return false;
+ }
+ );
+ return false;
+ }
+
}