You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/05/11 04:21:23 UTC
[incubator-uniffle] branch master updated: [#863] feat(coordinator): support comments in exclude node files (#874)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9c26c1b9 [#863] feat(coordinator): support comments in exclude node files (#874)
9c26c1b9 is described below
commit 9c26c1b93dfe487ddf7bedeaa2cd690e2b27efac
Author: yl09099 <33...@users.noreply.github.com>
AuthorDate: Thu May 11 12:21:18 2023 +0800
[#863] feat(coordinator): support comments in exclude node files (#874)
### What changes were proposed in this pull request?
Support comments in exclude node files for easy annotation of changes.
### Why are the changes needed?
Fix: #863
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Updated UT: SimpleClusterManagerTest
Co-authored-by: markyuanlu <ma...@didiglobal.com>
---
.../uniffle/coordinator/SimpleClusterManager.java | 4 +--
.../coordinator/SimpleClusterManagerTest.java | 41 +++++++++++++---------
2 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 511cab6b..9359d5d4 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -187,7 +187,7 @@ public class SimpleClusterManager implements ClusterManager {
try (BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
- if (!StringUtils.isEmpty(line)) {
+ if (!StringUtils.isEmpty(line) && !line.trim().startsWith("#")) {
nodes.add(line.trim());
}
}
@@ -287,7 +287,7 @@ public class SimpleClusterManager implements ClusterManager {
private ShuffleServerInternalGrpcClient getShuffleServerClient(ServerNode serverNode) {
try {
return clientCache.get(serverNode,
- () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), serverNode.getGrpcPort()));
+ () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), serverNode.getGrpcPort()));
} catch (ExecutionException e) {
throw new RssException(e);
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 19eec9bd..d2e5644f 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -84,11 +84,11 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, grpcTags, true);
+ 10, grpcTags, true);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, grpcTags, true);
+ 10, grpcTags, true);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, true);
+ 11, grpcTags, true);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -99,11 +99,11 @@ public class SimpleClusterManagerTest {
// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, Sets.newHashSet("new_tag"), true);
+ 10, Sets.newHashSet("new_tag"), true);
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, Sets.newHashSet("test", "new_tag"), true);
+ 10, Sets.newHashSet("test", "new_tag"), true);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
- 10, grpcTags, true);
+ 10, grpcTags, true);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn4);
@@ -196,11 +196,11 @@ public class SimpleClusterManagerTest {
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, testTags, false);
+ 10, testTags, false);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, testTags, true);
+ 10, testTags, true);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, testTags, true);
+ 11, testTags, true);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -240,7 +240,7 @@ public class SimpleClusterManagerTest {
assertEquals(2, serverNodes.size());
Set<String> expectedIds = Sets.newHashSet("sn0", "sn1");
assertEquals(expectedIds,
- serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+ serverNodes.stream().map(ServerNode::getId).collect(Collectors.toSet()));
await().atMost(1, TimeUnit.SECONDS).until(() -> clusterManager.getServerList(testTags).isEmpty());
addNode("sn2", clusterManager);
@@ -258,11 +258,11 @@ public class SimpleClusterManagerTest {
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, testTags, true);
+ 10, testTags, true);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, testTags, true);
+ 10, testTags, true);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, testTags, true);
+ 11, testTags, true);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -290,13 +290,13 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration())) {
scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags, true));
scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags, true));
scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags, true));
scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags, true));
assertTrue(scm.getExcludeNodes().isEmpty());
final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
@@ -312,6 +312,13 @@ public class SimpleClusterManagerTest {
await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(nodes2));
assertEquals(nodes2, scm.getExcludeNodes());
+ final Set<String> comments = Sets.newHashSet("# The contents of the first comment","node3-1999",
+ "# The contents of the second comment", "node4-1999", "# The content of the third comment");
+ final Set<String> noComments = Sets.newHashSet("node3-1999", "node4-1999");
+ writeExcludeHosts(excludeNodesPath, comments);
+ await().atMost(3, TimeUnit.SECONDS).until(() -> scm.getExcludeNodes().equals(noComments));
+ assertEquals(noComments, scm.getExcludeNodes());
+
Set<String> excludeNodes = scm.getExcludeNodes();
Thread.sleep(3000);
// excludeNodes shouldn't be updated if file has no change