You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/09 15:21:06 UTC

[incubator-uniffle] branch master updated: [Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a24837af [Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)
a24837af is described below

commit a24837af53903397a3ae4d56a422dcfea1997e3c
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Aug 9 23:21:01 2022 +0800

    [Improvement] Add the number of unhealthy nodes in CoordinatorMetrics (#147)
    
    ### What changes were proposed in this pull request?
    To solve #146
    
    ### Why are the changes needed?
    The number of unhealthy nodes can be viewed on the monitoring.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../uniffle/coordinator/CoordinatorMetrics.java    |  3 ++
 .../uniffle/coordinator/SimpleClusterManager.java  |  5 ++++
 .../coordinator/CoordinatorMetricsTest.java        |  2 +-
 .../coordinator/SimpleClusterManagerTest.java      | 33 ++++++++++++++++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
index 1e00f6e0..d7d19b89 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
@@ -35,6 +35,7 @@ public class CoordinatorMetrics {
   private static final String RUNNING_APP_NUM = "running_app_num";
   private static final String TOTAL_APP_NUM = "total_app_num";
   private static final String EXCLUDE_SERVER_NUM = "exclude_server_num";
+  private static final String UNHEALTHY_SERVER_NUM = "unhealthy_server_num";
   private static final String TOTAL_ACCESS_REQUEST = "total_access_request";
   private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
   private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
@@ -42,6 +43,7 @@ public class CoordinatorMetrics {
 
   static Gauge gaugeTotalServerNum;
   static Gauge gaugeExcludeServerNum;
+  static Gauge gaugeUnhealthyServerNum;
   static Gauge gaugeRunningAppNum;
   static Counter counterTotalAppNum;
   static Counter counterTotalAccessRequest;
@@ -95,6 +97,7 @@ public class CoordinatorMetrics {
   private static void setUpMetrics() {
     gaugeTotalServerNum = metricsManager.addGauge(TOTAL_SERVER_NUM);
     gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM);
+    gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM);
     gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM);
     counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
     counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 83b29c89..8f81b396 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -85,10 +85,14 @@ public class SimpleClusterManager implements ClusterManager {
     try {
       long timestamp = System.currentTimeMillis();
       Set<String> deleteIds = Sets.newHashSet();
+      Set<String> unhealthyNode = Sets.newHashSet();
       for (ServerNode sn : servers.values()) {
         if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
           LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from node list.");
           deleteIds.add(sn.getId());
+        } else if (!sn.isHealthy()) {
+          LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
+          unhealthyNode.add(sn.getId());
         }
       }
       for (String serverId : deleteIds) {
@@ -100,6 +104,7 @@ public class SimpleClusterManager implements ClusterManager {
         }
       }
 
+      CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
       CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
     } catch (Exception e) {
       LOG.warn("Error happened in nodesCheck", e);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
index 0e70092b..872d713c 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
@@ -87,7 +87,7 @@ public class CoordinatorMetricsTest {
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
-    assertEquals(8, actualObj.get("metrics").size());
+    assertEquals(9, actualObj.get("metrics").size());
   }
 
   @Test
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 5abb4e9d..e7894f1d 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -105,6 +105,39 @@ public class SimpleClusterManagerTest {
     clusterManager.close();
   }
 
+  @Test
+  public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws IOException {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, testTags, false);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, testTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, testTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+
+    List<ServerNode> serverNodes = clusterManager.getServerList(testTags);
+    assertEquals(2, serverNodes.size());
+    assertEquals(0, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+    clusterManager.nodesCheck();
+
+    List<ServerNode> serverList = clusterManager.getServerList(testTags);
+    Assertions.assertEquals(2, serverList.size());
+    assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+
+    sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L);
+    clusterManager.nodesCheck();
+
+    List<ServerNode> serverList2 = clusterManager.getServerList(testTags);
+    Assertions.assertEquals(1, serverList2.size());
+    assertEquals(1, CoordinatorMetrics.gaugeUnhealthyServerNum.get());
+    clusterManager.close();
+  }
+
   @Test
   public void heartbeatTimeoutTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();