You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/09 15:21:06 UTC
[incubator-uniffle] branch master updated: [Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a24837af [Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)
a24837af is described below
commit a24837af53903397a3ae4d56a422dcfea1997e3c
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Aug 9 23:21:01 2022 +0800
[Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)
### What changes were proposed in this pull request?
To solve #146
### Why are the changes needed?
The number of unhealthy nodes can be viewed on the monitoring.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../uniffle/coordinator/CoordinatorMetrics.java | 3 ++
.../uniffle/coordinator/SimpleClusterManager.java | 5 ++++
.../coordinator/CoordinatorMetricsTest.java | 2 +-
.../coordinator/SimpleClusterManagerTest.java | 33 ++++++++++++++++++++++
4 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
index 1e00f6e0..d7d19b89 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
@@ -35,6 +35,7 @@ public class CoordinatorMetrics {
private static final String RUNNING_APP_NUM = "running_app_num";
private static final String TOTAL_APP_NUM = "total_app_num";
private static final String EXCLUDE_SERVER_NUM = "exclude_server_num";
+ private static final String UNHEALTHY_SERVER_NUM = "unhealthy_server_num";
private static final String TOTAL_ACCESS_REQUEST = "total_access_request";
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
@@ -42,6 +43,7 @@ public class CoordinatorMetrics {
static Gauge gaugeTotalServerNum;
static Gauge gaugeExcludeServerNum;
+ static Gauge gaugeUnhealthyServerNum;
static Gauge gaugeRunningAppNum;
static Counter counterTotalAppNum;
static Counter counterTotalAccessRequest;
@@ -95,6 +97,7 @@ public class CoordinatorMetrics {
private static void setUpMetrics() {
gaugeTotalServerNum = metricsManager.addGauge(TOTAL_SERVER_NUM);
gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM);
+ gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM);
gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM);
counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 83b29c89..8f81b396 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -85,10 +85,14 @@ public class SimpleClusterManager implements ClusterManager {
try {
long timestamp = System.currentTimeMillis();
Set<String> deleteIds = Sets.newHashSet();
+ Set<String> unhealthyNode = Sets.newHashSet();
for (ServerNode sn : servers.values()) {
if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from node list.");
deleteIds.add(sn.getId());
+ } else if (!sn.isHealthy()) {
+ LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
+ unhealthyNode.add(sn.getId());
}
}
for (String serverId : deleteIds) {
@@ -100,6 +104,7 @@ public class SimpleClusterManager implements ClusterManager {
}
}
+ CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
} catch (Exception e) {
LOG.warn("Error happened in nodesCheck", e);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
index 0e70092b..872d713c 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
@@ -87,7 +87,7 @@ public class CoordinatorMetricsTest {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
- assertEquals(8, actualObj.get("metrics").size());
+ assertEquals(9, actualObj.get("metrics").size());
}
@Test
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 5abb4e9d..e7894f1d 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -105,6 +105,39 @@ public class SimpleClusterManagerTest {
clusterManager.close();
}
+ @Test
+ public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws IOException {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
+ SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, testTags, false);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, testTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, testTags, true);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+
+ List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
+ assertEquals(2, serverNodes.size());
+ assertEquals(0, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+ clusterManager.nodesCheck();
+
+ List<ServerNode> serverList = clusterManager.getServerList(testTags);
+ Assertions.assertEquals(2, serverList.size());
+ assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+
+ sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
+ clusterManager.nodesCheck();
+
+ List<ServerNode> serverList2 = clusterManager.getServerList(testTags);
+ Assertions.assertEquals(1, serverList2.size());
+ assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+ clusterManager.close();
+ }
+
@Test
public void heartbeatTimeoutTest() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();