You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:51 UTC
[incubator-uniffle] 07/17: [Minor] Remove serverNode from tags structure when heartbeart timeout (#193)
This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit d92208ddb1edca13fcb6cb31a8980b2052f29d7b
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Thu Jun 23 15:30:19 2022 +0800
[Minor] Remove serverNode from tags structure when heartbeart timeout (#193)
### What changes were proposed in this pull request?
Remove serverNode from tags structure when heartbeart timeout
### Why are the changes needed?
Remove serverNode from tags structure when heartbeart timeout
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
---
.../com/tencent/rss/coordinator/ServerNode.java | 7 ++++++
.../rss/coordinator/SimpleClusterManager.java | 9 ++++++--
.../rss/coordinator/SimpleClusterManagerTest.java | 27 ++++++++++++++++++++++
3 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
index ef09298..816f080 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java
@@ -115,6 +115,13 @@ public class ServerNode implements Comparable<ServerNode> {
+ ", healthy[" + isHealthy + "]";
}
+ /**
+ * Only for test case
+ */
+ void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
@Override
public int compareTo(ServerNode other) {
if (availableMemory > other.getAvailableMemory()) {
diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index d3fe789..10af74d 100644
--- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager {
}
}
- private void nodesCheck() {
+ void nodesCheck() {
try {
long timestamp = System.currentTimeMillis();
Set<String> deleteIds = Sets.newHashSet();
@@ -83,7 +83,12 @@ public class SimpleClusterManager implements ClusterManager {
}
}
for (String serverId : deleteIds) {
- servers.remove(serverId);
+ ServerNode sn = servers.remove(serverId);
+ if (sn != null) {
+ for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
+ nodesWithTag.remove(sn);
+ }
+ }
}
CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index a5040bf..bed9081 100644
--- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -142,6 +143,32 @@ public class SimpleClusterManagerTest {
assertEquals(0, serverNodes.size());
}
+ @Test
+ public void testGetCorrectServerNodesWhenOneNodeRemoved() {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
+ SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, testTags, true);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, testTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, testTags, true);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
+ assertEquals(3, serverNodes.size());
+
+ sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
+ clusterManager.nodesCheck();
+
+ Map<String, Set<ServerNode>> tagToNodes = clusterManager.getTagToNodes();
+ List<ServerNode> serverList = clusterManager.getServerList(testTags);
+ Assertions.assertEquals(2, tagToNodes.get(testTags.iterator().next()).size());
+ Assertions.assertEquals(2, serverList.size());
+ }
+
@Test
public void updateExcludeNodesTest() throws Exception {
String excludeNodesFolder = (new File(ClassLoader.getSystemResource("empty").getFile())).getParent();