You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/05/11 04:21:23 UTC

[incubator-uniffle] branch master updated: [#863] feat(coordinator): support comments in exclude node files (#874)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c26c1b9 [#863] feat(coordinator): support comments in exclude node files (#874)
9c26c1b9 is described below

commit 9c26c1b93dfe487ddf7bedeaa2cd690e2b27efac
Author: yl09099 <33...@users.noreply.github.com>
AuthorDate: Thu May 11 12:21:18 2023 +0800

    [#863] feat(coordinator): support comments in exclude node files (#874)
    
    ### What changes were proposed in this pull request?
    
    Support comments in exclude node files for easy annotation of changes.
    
    ### Why are the changes needed?
    
    Fix: #863
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated UT: SimpleClusterManagerTest
    
    Co-authored-by: markyuanlu <ma...@didiglobal.com>
---
 .../uniffle/coordinator/SimpleClusterManager.java  |  4 +--
 .../coordinator/SimpleClusterManagerTest.java      | 41 +++++++++++++---------
 2 files changed, 26 insertions(+), 19 deletions(-)

diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 511cab6b..9359d5d4 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -187,7 +187,7 @@ public class SimpleClusterManager implements ClusterManager {
     try (BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8))) {
       String line;
       while ((line = br.readLine()) != null) {
-        if (!StringUtils.isEmpty(line)) {
+        if (!StringUtils.isEmpty(line) && !line.trim().startsWith("#")) {
           nodes.add(line.trim());
         }
       }
@@ -287,7 +287,7 @@ public class SimpleClusterManager implements ClusterManager {
   private ShuffleServerInternalGrpcClient getShuffleServerClient(ServerNode serverNode) {
     try {
       return clientCache.get(serverNode,
-              () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), serverNode.getGrpcPort()));
+          () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), serverNode.getGrpcPort()));
     } catch (ExecutionException e) {
       throw new RssException(e);
     }
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 19eec9bd..d2e5644f 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -84,11 +84,11 @@ public class SimpleClusterManagerTest {
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
 
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-              10, grpcTags, true);
+          10, grpcTags, true);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-              10, grpcTags, true);
+          10, grpcTags, true);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-              11, grpcTags, true);
+          11, grpcTags, true);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -99,11 +99,11 @@ public class SimpleClusterManagerTest {
 
       // tag changes
       sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-              10, Sets.newHashSet("new_tag"), true);
+          10, Sets.newHashSet("new_tag"), true);
       sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-              10, Sets.newHashSet("test", "new_tag"), true);
+          10, Sets.newHashSet("test", "new_tag"), true);
       ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
-              10, grpcTags, true);
+          10, grpcTags, true);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn4);
@@ -196,11 +196,11 @@ public class SimpleClusterManagerTest {
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-              10, testTags, false);
+          10, testTags, false);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-              10, testTags, true);
+          10, testTags, true);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-              11, testTags, true);
+          11, testTags, true);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -240,7 +240,7 @@ public class SimpleClusterManagerTest {
       assertEquals(2, serverNodes.size());
       Set<String> expectedIds = Sets.newHashSet("sn0", "sn1");
       assertEquals(expectedIds,
-              serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+          serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
       await().atMost(1, TimeUnit.SECONDS).until(() -> clusterManager.getServerList(testTags).isEmpty());
 
       addNode("sn2", clusterManager);
@@ -258,11 +258,11 @@ public class SimpleClusterManagerTest {
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-              10, testTags, true);
+          10, testTags, true);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-              10, testTags, true);
+          10, testTags, true);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-              11, testTags, true);
+          11, testTags, true);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -290,13 +290,13 @@ public class SimpleClusterManagerTest {
 
     try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration())) {
       scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
-              10, testTags, true));
+          10, testTags, true));
       scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,
-              10, testTags, true));
+          10, testTags, true));
       scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20,
-              10, testTags, true));
+          10, testTags, true));
       scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
-              10, testTags, true));
+          10, testTags, true));
       assertTrue(scm.getExcludeNodes().isEmpty());
 
       final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
@@ -312,6 +312,13 @@ public class SimpleClusterManagerTest {
       await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(nodes2));
       assertEquals(nodes2, scm.getExcludeNodes());
 
+      final Set<String> comments = Sets.newHashSet("# The contents of the first comment","node3-1999",
+          "# The contents of the second comment", "node4-1999", "# The content of the third comment");
+      final Set<String> noComments = Sets.newHashSet("node3-1999", "node4-1999");
+      writeExcludeHosts(excludeNodesPath, comments);
+      await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(noComments));
+      assertEquals(noComments, scm.getExcludeNodes());
+      
       Set<String> excludeNodes = scm.getExcludeNodes();
       Thread.sleep(3000);
       // excludeNodes shouldn't be updated if file has no change