You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/13 09:05:33 UTC

[incubator-uniffle] branch master updated: [TEST] Improve SimpleClusterManagerTest (#216)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd1e375 [TEST] Improve SimpleClusterManagerTest (#216)
6fd1e375 is described below

commit 6fd1e37553c10e88b59ecade52ce45cc400b064c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Tue Sep 13 17:05:27 2022 +0800

    [TEST] Improve SimpleClusterManagerTest (#216)
    
    ### What changes were proposed in this pull request?
    
    1. Use `Awaitility.await` to replace arbitrary sleep in tests.
    2. Make `heartbeatTimeoutTest` more deterministic.
    
    ### Why are the changes needed?
    
    To make tests correct and fast.
    And serve as an example of waiting longer for correctness without worrying about the speed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Before
    <img width="609" alt="20 sec" src="https://user-images.githubusercontent.com/5821159/189841692-c45ae11b-1c06-475c-859d-821573a8c276.png">
    
    After
    <img width="611" alt="13 sec" src="https://user-images.githubusercontent.com/5821159/189841796-fec3a837-b236-4d7d-b029-0e46a8bcef3c.png">
---
 .../coordinator/SimpleClusterManagerTest.java      | 103 +++++++++------------
 pom.xml                                            |  13 +++
 2 files changed, 55 insertions(+), 61 deletions(-)

diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 63835267..be3fe2ce 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -24,6 +24,8 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
@@ -31,13 +33,18 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SimpleClusterManagerTest {
 
-  private Set<String> testTags = Sets.newHashSet("test");
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleClusterManagerTest.class);
+
+  private final Set<String> testTags = Sets.newHashSet("test");
 
   @BeforeEach
   public void setUp() {
@@ -66,10 +73,7 @@ public class SimpleClusterManagerTest {
     List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
     assertEquals(3, serverNodes.size());
     Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
-    for (ServerNode node : serverNodes) {
-      expectedIds.remove(node.getId());
-    }
-    assertEquals(0, expectedIds.size());
+    assertEquals(expectedIds, serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
 
     // tag changes
     sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
@@ -137,46 +141,32 @@ public class SimpleClusterManagerTest {
     clusterManager.close();
   }
 
+  private void addNode(String id, SimpleClusterManager clusterManager) {
+    ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, testTags, true);
+    LOG.info("Add node " + node.getId() + " " + node.getTimestamp());
+    clusterManager.add(node);
+  }
+
   @Test
   public void heartbeatTimeoutTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    Thread t = new Thread(() -> {
-      for (int i = 0; i < 3; i++) {
-        if (i == 2) {
-          try {
-            Thread.sleep(800);
-          } catch (Exception e) {
-            // ignore
-          }
-        }
-
-        String sn = "sn" + i;
-        long availableMemory = 30 - i;
-        ServerNode node = new ServerNode(sn, "ip", 0, 100L, 50L, availableMemory,
-            10, testTags, true);
-        System.out.println("Add node " + node.getId() + " " + node.getTimestamp());
-        clusterManager.add(node);
-      }
-    });
-    t.start();
 
-    Thread.sleep(100);
+    addNode("sn0", clusterManager);
+    addNode("sn1", clusterManager);
     List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
     assertEquals(2, serverNodes.size());
     Set<String> expectedIds = Sets.newHashSet("sn0", "sn1");
-    for (ServerNode node : serverNodes) {
-      expectedIds.remove(node.getId());
-    }
-    assertEquals(0, expectedIds.size());
-    Thread.sleep(1000);
+    assertEquals(expectedIds,
+        serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    await().atMost(1, TimeUnit.SECONDS).until(() -> clusterManager.getServerList(testTags).isEmpty());
+
+    addNode("sn2", clusterManager);
     serverNodes = clusterManager.getServerList(testTags);
     assertEquals(1, serverNodes.size());
     assertEquals("sn2", serverNodes.get(0).getId());
-    Thread.sleep(500);
-    serverNodes = clusterManager.getServerList(testTags);
-    assertEquals(0, serverNodes.size());
+    await().atMost(1, TimeUnit.SECONDS).until(() -> clusterManager.getServerList(testTags).isEmpty());
 
     clusterManager.close();
   }
@@ -217,9 +207,6 @@ public class SimpleClusterManagerTest {
     ssc.setString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, URI.create(excludeNodesPath).toString());
     ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 2000);
 
-    Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
-    writeExcludeHosts(excludeNodesPath, nodes);
-
     SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration());
     scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
         10, testTags, true));
@@ -229,48 +216,42 @@ public class SimpleClusterManagerTest {
         10, testTags, true));
     scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
         10, testTags, true));
-    assertEquals(0, scm.getExcludeNodes().size());
-    Thread.sleep(3000);
-    assertEquals(nodes, scm.getExcludeNodes());
+    assertTrue(scm.getExcludeNodes().isEmpty());
+
+    final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
+    writeExcludeHosts(excludeNodesPath, nodes);
+    await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(nodes));
     List<ServerNode> availableNodes = scm.getServerList(testTags);
     assertEquals(2, availableNodes.size());
     Set<String> remainNodes = Sets.newHashSet("node3-1999", "node4-1999");
-    for (ServerNode node : availableNodes) {
-      remainNodes.remove(node.getId());
-    }
-    assertEquals(0, remainNodes.size());
+    assertEquals(remainNodes, availableNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
 
-    nodes = Sets.newHashSet("node3-1999", "node4-1999");
-    writeExcludeHosts(excludeNodesPath, nodes);
-    Thread.sleep(3000);
-    assertEquals(nodes, scm.getExcludeNodes());
+    final Set<String> nodes2 = Sets.newHashSet("node3-1999", "node4-1999");
+    writeExcludeHosts(excludeNodesPath, nodes2);
+    await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(nodes2));
+    assertEquals(nodes2, scm.getExcludeNodes());
 
     Set<String> excludeNodes = scm.getExcludeNodes();
     Thread.sleep(3000);
-    // excludeNodes shouldn't be update if file has no change
-    assertTrue(excludeNodes == scm.getExcludeNodes());
+    // excludeNodes shouldn't be updated if file has no change
+    assertEquals(excludeNodes, scm.getExcludeNodes());
 
     writeExcludeHosts(excludeNodesPath, Sets.newHashSet());
-    Thread.sleep(3000);
     // excludeNodes is an empty file, set should be empty
-    assertEquals(0, scm.getExcludeNodes().size());
+    await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().isEmpty());
 
-    nodes = Sets.newHashSet("node1-1999");
-    writeExcludeHosts(excludeNodesPath, nodes);
-    Thread.sleep(3000);
+    final Set<String> nodes3 = Sets.newHashSet("node1-1999");
+    writeExcludeHosts(excludeNodesPath, nodes3);
+    await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(nodes3));
 
     File blacklistFile = new File(excludeNodesPath);
-    blacklistFile.delete();
-    Thread.sleep(3000);
+    assertTrue(blacklistFile.delete());
     // excludeNodes is deleted, set should be empty
-    assertEquals(0, scm.getExcludeNodes().size());
+    await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().isEmpty());
 
     remainNodes = Sets.newHashSet("node1-1999", "node2-1999", "node3-1999", "node4-1999");
     availableNodes = scm.getServerList(testTags);
-    for (ServerNode node : availableNodes) {
-      remainNodes.remove(node.getId());
-    }
-    assertEquals(0, remainNodes.size());
+    assertEquals(remainNodes, availableNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
 
     scm.close();
   }
diff --git a/pom.xml b/pom.xml
index d08dc4ab..7db56f06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,6 +35,7 @@
   </parent>
 
   <properties>
+    <awaitility.version>4.2.0</awaitility.version>
     <checkstyle.version>9.3</checkstyle.version>
     <commons-logging.version>1.2</commons-logging.version>
     <commons-lang3.version>3.10</commons-lang3.version>
@@ -125,6 +126,11 @@
       <artifactId>error_prone_annotations</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter</artifactId>
@@ -494,6 +500,13 @@
         <version>${prometheus.simpleclient.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.awaitility</groupId>
+        <artifactId>awaitility</artifactId>
+        <version>${awaitility.version}</version>
+        <scope>test</scope>
+      </dependency>
+
       <dependency>
         <groupId>org.junit.jupiter</groupId>
         <artifactId>junit-jupiter</artifactId>