You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/06 11:54:46 UTC

[rocketmq] branch develop updated: [ISSUE#4949] fix Address already in use (#5000)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ee1e542e1 [ISSUE#4949] fix Address already in use (#5000)
ee1e542e1 is described below

commit ee1e542e111d65b44d815baf78da85bb7c99c828
Author: windWheel <18...@qq.com>
AuthorDate: Tue Sep 6 19:54:34 2022 +0800

    [ISSUE#4949] fix Address already in use (#5000)
    
    * [flaky-test] fix Address already in use'
    
    * Re-apply for a port when the port is occupied
---
 .../test/autoswitchrole/AutoSwitchRoleBase.java    | 35 +++++++++++--
 .../AutoSwitchRoleIntegrationTest.java             | 57 +++++++++++++++++++---
 2 files changed, 80 insertions(+), 12 deletions(-)

diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index e6757909e..98e3d7732 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -18,8 +18,10 @@
 package org.apache.rocketmq.test.autoswitchrole;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,7 +61,9 @@ public class AutoSwitchRoleBase {
     protected List<BrokerController> brokerList;
     private SocketAddress BornHost;
     private SocketAddress StoreHost;
-
+    private static Integer No= 0;
+    
+    
     protected void initialize() {
         this.brokerList = new ArrayList<>();
         try {
@@ -68,9 +72,32 @@ public class AutoSwitchRoleBase {
         } catch (Exception ignored) {
         }
     }
-
-    public int nextPort() {
-        return PORT_COUNTER.addAndGet(10 + random.nextInt(10));
+    
+    public static Integer nextPort() throws IOException {
+        return nextPort(1001,9999);
+    }
+    
+    public static Integer nextPort(Integer minPort, Integer maxPort) throws IOException  {
+        Random random = new Random();
+        int tempPort;
+        int port;
+        try{
+            while (true){
+                tempPort = random.nextInt(maxPort)%(maxPort-minPort+1) + minPort;
+                ServerSocket serverSocket =  new ServerSocket(tempPort);
+                port = serverSocket.getLocalPort();
+                serverSocket.close();
+                break;
+            }
+        }catch (Exception ignored){
+            if (No>200){
+                throw new IOException("This server's open ports are temporarily full!");
+            }
+            No++;
+            port = nextPort(minPort,maxPort);
+        }
+        No = 0;
+        return port;
     }
 
     public BrokerController startBroker(String namesrvAddress, String controllerAddress, int brokerId, int haPort, int brokerListenPort,
diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index a3ffe2675..dfa1d849a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -18,6 +18,13 @@
 package org.apache.rocketmq.test.autoswitchrole;
 
 import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.common.UtilAll;
@@ -35,9 +42,11 @@ import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnectionState;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.test.base.BaseConf;
 import org.junit.After;
 import org.junit.Test;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -52,7 +61,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
     private String controllerAddress;
     private BrokerController brokerController1;
     private BrokerController brokerController2;
-
+    protected List<BrokerController> brokerControllerList;
+    
+    
     public void init(int mappedFileSize) throws Exception {
         super.initialize();
 
@@ -78,7 +89,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
         this.brokerController1 = startBroker(this.namesrvAddress, this.controllerAddress, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, mappedFileSize);
         this.brokerController2 = startBroker(this.namesrvAddress, this.controllerAddress, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, mappedFileSize);
-
+        this.brokerControllerList = ImmutableList.of(brokerController1, brokerController2);
+    
+    
         // Wait slave connecting to master
         assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
     }
@@ -110,18 +123,24 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
     @Test
     public void testCheckSyncStateSet() throws Exception {
         init(defaultFileSize);
+        awaitDispatchMs(6);
         mockData();
 
         // Check sync state set
         final ReplicasManager replicasManager = brokerController1.getReplicasManager();
         SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
         assertEquals(2, syncStateSet.getSyncStateSet().size());
-
+        
+        
         // Shutdown controller2
-        this.brokerController2.shutdown();
-
-        Thread.sleep(5000);
+        ScheduledExecutorService singleThread = Executors.newSingleThreadScheduledExecutor();
+        while (!singleThread.awaitTermination(6* 1000, TimeUnit.MILLISECONDS)) {
+            this.brokerController2.shutdown();
+            singleThread.shutdown();
+        }
+        
         syncStateSet = replicasManager.getSyncStateSet();
+        shutdown();
         assertEquals(1, syncStateSet.getSyncStateSet().size());
     }
 
@@ -154,6 +173,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
 
         // Check slave message
         checkMessage(brokerController1.getMessageStore(), 20, 0);
+        shutdown();
     }
 
     @Test
@@ -170,6 +190,7 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         putMessage(this.brokerController1.getMessageStore());
         Thread.sleep(3000);
         checkMessage(broker3.getMessageStore(), 20, 0);
+        shutdown();
     }
 
     @Test
@@ -222,9 +243,9 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         waitSlaveReady(broker4.getMessageStore());
         Thread.sleep(6000);
         checkMessage(broker4.getMessageStore(), 10, 10);
+        shutdown();
     }
-
-    @After
+    
     public void shutdown() throws InterruptedException {
         for (BrokerController controller : this.brokerList) {
             controller.shutdown();
@@ -236,4 +257,24 @@ public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
         }
         super.destroy();
     }
+    
+    public boolean awaitDispatchMs(long timeMs) throws Exception {
+        await().atMost(Duration.ofSeconds(timeMs)).until(
+            () -> {
+                boolean allOk = true;
+                for (BrokerController brokerController: brokerControllerList) {
+                    if (brokerController.getMessageStore() == null) {
+                        allOk = false;
+                        break;
+                    }
+                }
+                if (allOk) {
+                    return true;
+                }
+                return false;
+            }
+        );
+        return false;
+    }
+    
 }