You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/05 08:44:38 UTC
[incubator-uniffle] branch master updated: [#927] Improvement: improve the control of server heartbeat (#928)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea306f7 [#927] Improvement: improve the control of server heartbeat (#928)
0ea306f7 is described below
commit 0ea306f7ba34f9e599b4d28ed236fa2dc9eacdb8
Author: summaryzb <su...@gmail.com>
AuthorDate: Mon Jun 5 16:44:32 2023 +0800
[#927] Improvement: improve the control of server heartbeat (#928)
### What changes were proposed in this pull request?
Eleminate rss.server.heartbeat.timeout replaced with rss.server.heartbeat.interval
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/927
### Does this PR introduce _any_ user-facing change?
Yes, `rss.server.heartbeat.timeout` will effect nothing
### How was this patch tested?
UnitTest
---
README.md | 1 -
.../java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java | 2 +-
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 2 +-
.../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java | 2 +-
deploy/kubernetes/operator/examples/configuration.yaml | 1 -
docs/server_guide.md | 2 --
.../test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java | 3 +--
.../src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java | 5 ++---
.../src/main/java/org/apache/uniffle/server/ShuffleServerConf.java | 6 ------
server/src/test/resources/server.conf | 1 -
10 files changed, 6 insertions(+), 19 deletions(-)
diff --git a/README.md b/README.md
index 2bbe5129..a4246fb5 100644
--- a/README.md
+++ b/README.md
@@ -187,7 +187,6 @@ rss-xxx.tgz will be generated for deployment
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
- rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index c79dda62..61c2531d 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -195,7 +195,7 @@ public class RssMRAppMaster extends MRAppMaster {
}
int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f4740cf0..27c2226e 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -256,7 +256,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d5fec8cb..d0892ce6 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -335,7 +335,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
diff --git a/deploy/kubernetes/operator/examples/configuration.yaml b/deploy/kubernetes/operator/examples/configuration.yaml
index 16b551a9..5fcb70b8 100644
--- a/deploy/kubernetes/operator/examples/configuration.yaml
+++ b/deploy/kubernetes/operator/examples/configuration.yaml
@@ -64,7 +64,6 @@ data:
rss.server.hdfs.base.path hdfs://${your-hdfs-path}
rss.server.health.check.enable false
rss.server.heartbeat.interval 10000
- rss.server.heartbeat.timeout 60000
rss.server.memory.shuffle.highWaterMark.percentage 70.0
rss.server.memory.shuffle.lowWaterMark.percentage 10.0
rss.server.pending.event.timeoutSec 600
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4f801143..ce83b93f 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -47,7 +47,6 @@ This document will introduce how to deploy Uniffle shuffle servers.
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
- rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
@@ -143,7 +142,6 @@ rss.storage.basePath /data1/rssdata,/data2/rssdata....
rss.server.flush.thread.alive 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
-rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 9a450ca5..566d99e9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -375,7 +375,6 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000);
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
@@ -410,7 +409,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
);
});
return shuffleAssignments;
- }, heartbeatTimeout, maxTryTime);
+ }, heartbeatInterval, maxTryTime);
assertNotNull(response);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index ddb7d53a..893b9c55 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -51,13 +51,11 @@ public class RegisterHeartBeat {
private final ScheduledExecutorService service =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat");
private final ExecutorService heartBeatExecutorService;
- private long heartBeatTimeout;
public RegisterHeartBeat(ShuffleServer shuffleServer) {
ShuffleServerConf conf = shuffleServer.getShuffleServerConf();
this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
- this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
CoordinatorClientFactory factory =
new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
@@ -107,6 +105,7 @@ public class RegisterHeartBeat {
Map<String, StorageInfo> localStorageInfo,
int nettyPort) {
boolean sendSuccessfully = false;
+ // use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
id,
ip,
@@ -115,7 +114,7 @@ public class RegisterHeartBeat {
preAllocatedMemory,
availableMemory,
eventNumInFlush,
- heartBeatTimeout,
+ heartBeatInterval,
tags,
isHealthy,
serverStatus,
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a34c0138..ecd11609 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -75,12 +75,6 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Heartbeat interval to Coordinator (ms)");
- public static final ConfigOption<Long> SERVER_HEARTBEAT_TIMEOUT = ConfigOptions
- .key("rss.server.heartbeat.timeout")
- .longType()
- .defaultValue(60 * 1000L)
- .withDescription("rss heartbeat interval ms");
-
public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
.key("rss.server.flush.threadPool.size")
.intType()
diff --git a/server/src/test/resources/server.conf b/server/src/test/resources/server.conf
index ca7a47bf..585da760 100644
--- a/server/src/test/resources/server.conf
+++ b/server/src/test/resources/server.conf
@@ -24,6 +24,5 @@ rss.server.buffer.spill.threshold 130
rss.server.partition.buffer.size 128
rss.jetty.http.port 12345
rss.jetty.corePool.size 64
-rss.server.heartbeat.timeout 1
rss.server.write.timeout 2000
rss.server.shuffleBufferManager.trigger.flush.interval 500