You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:51 UTC

[incubator-uniffle] 07/17: [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit d92208ddb1edca13fcb6cb31a8980b2052f29d7b
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Thu Jun 23 15:30:19 2022 +0800

    [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)
    
    ### What changes were proposed in this pull request?
    Remove serverNode from tags structure when heartbeart timeout
    
    ### Why are the changes needed?
    Remove serverNode from tags structure when heartbeart timeout
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../com/tencent/rss/coordinator/ServerNode.java    |  7 ++++++
 .../rss/coordinator/SimpleClusterManager.java      |  9 ++++++--
 .../rss/coordinator/SimpleClusterManagerTest.java  | 27 ++++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
index ef09298..816f080 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
@@ -115,6 +115,13 @@ public class ServerNode implements Comparable<ServerNode> {
         + ", healthy[" + isHealthy + "]";
   }
 
+  /**
+   * Only for test case
+   */
+  void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
   @Override
   public int compareTo(ServerNode other) {
     if (availableMemory > other.getAvailableMemory()) {
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index d3fe789..10af74d 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
     }
   }
 
-  private void nodesCheck() {
+  void nodesCheck() {
     try {
       long timestamp = System.currentTimeMillis();
       Set<String> deleteIds = Sets.newHashSet();
@@ -83,7 +83,12 @@ public class SimpleClusterManager implements ClusterManager {
         }
       }
       for (String serverId : deleteIds) {
-        servers.remove(serverId);
+        ServerNode sn = servers.remove(serverId);
+        if (sn != null) {
+          for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
+            nodesWithTag.remove(sn);
+          }
+        }
       }
 
       CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index a5040bf..bed9081 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -142,6 +143,32 @@ public class SimpleClusterManagerTest {
     assertEquals(0, serverNodes.size());
   }
 
+  @Test
+  public void testGetCorrectServerNodesWhenOneNodeRemoved() {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+            10, testTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+            10, testTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+            11, testTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
+    assertEquals(3, serverNodes.size());
+
+    sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
+    clusterManager.nodesCheck();
+
+    Map<String, Set<ServerNode>> tagToNodes = clusterManager.getTagToNodes();
+    List<ServerNode> serverList = clusterManager.getServerList(testTags);
+    Assertions.assertEquals(2, tagToNodes.get(testTags.iterator().next()).size());
+    Assertions.assertEquals(2, serverList.size());
+  }
+
   @Test
   public void updateExcludeNodesTest() throws Exception {
     String excludeNodesFolder = (new File(ClassLoader.getSystemResource("empty").getFile())).getParent();