You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/05 08:44:38 UTC

[incubator-uniffle] branch master updated: [#927] Improvement: improve the control of server heartbeat (#928)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea306f7 [#927] Improvement: improve the control of server heartbeat (#928)
0ea306f7 is described below

commit 0ea306f7ba34f9e599b4d28ed236fa2dc9eacdb8
Author: summaryzb <su...@gmail.com>
AuthorDate: Mon Jun 5 16:44:32 2023 +0800

    [#927] Improvement: improve the control of server heartbeat (#928)
    
    ### What changes were proposed in this pull request?
     Eleminate rss.server.heartbeat.timeout replaced with rss.server.heartbeat.interval
    
    ### Why are the changes needed?
    https://github.com/apache/incubator-uniffle/issues/927
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, `rss.server.heartbeat.timeout` will effect nothing
    
    ### How was this patch tested?
    UnitTest
---
 README.md                                                           | 1 -
 .../java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java     | 2 +-
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java   | 2 +-
 .../src/main/java/org/apache/spark/shuffle/RssShuffleManager.java   | 2 +-
 deploy/kubernetes/operator/examples/configuration.yaml              | 1 -
 docs/server_guide.md                                                | 2 --
 .../test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java | 3 +--
 .../src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java  | 5 ++---
 .../src/main/java/org/apache/uniffle/server/ShuffleServerConf.java  | 6 ------
 server/src/test/resources/server.conf                               | 1 -
 10 files changed, 6 insertions(+), 19 deletions(-)

diff --git a/README.md b/README.md
index 2bbe5129..a4246fb5 100644
--- a/README.md
+++ b/README.md
@@ -187,7 +187,6 @@ rss-xxx.tgz will be generated for deployment
      rss.server.flush.threadPool.size 10
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
-     rss.server.heartbeat.timeout 60000
      rss.server.heartbeat.interval 10000
      rss.rpc.message.max.size 1073741824
      rss.server.preAllocation.expired 120000
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index c79dda62..61c2531d 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -195,7 +195,7 @@ public class RssMRAppMaster extends MRAppMaster {
       }
       
       int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
-      // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+      // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
       long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
               RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
       int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f4740cf0..27c2226e 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -256,7 +256,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
 
     int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
 
-    // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+    // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
 
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d5fec8cb..d0892ce6 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -335,7 +335,7 @@ public class RssShuffleManager extends RssShuffleManagerBase {
 
     int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
 
-    // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+    // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
     int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
diff --git a/deploy/kubernetes/operator/examples/configuration.yaml b/deploy/kubernetes/operator/examples/configuration.yaml
index 16b551a9..5fcb70b8 100644
--- a/deploy/kubernetes/operator/examples/configuration.yaml
+++ b/deploy/kubernetes/operator/examples/configuration.yaml
@@ -64,7 +64,6 @@ data:
     rss.server.hdfs.base.path hdfs://${your-hdfs-path}
     rss.server.health.check.enable false
     rss.server.heartbeat.interval 10000
-    rss.server.heartbeat.timeout 60000
     rss.server.memory.shuffle.highWaterMark.percentage 70.0
     rss.server.memory.shuffle.lowWaterMark.percentage 10.0
     rss.server.pending.event.timeoutSec 600
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4f801143..ce83b93f 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -47,7 +47,6 @@ This document will introduce how to deploy Uniffle shuffle servers.
      rss.server.flush.threadPool.size 10
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
-     rss.server.heartbeat.timeout 60000
      rss.server.heartbeat.interval 10000
      rss.rpc.message.max.size 1073741824
      rss.server.preAllocation.expired 120000
@@ -143,7 +142,6 @@ rss.storage.basePath /data1/rssdata,/data2/rssdata....
 rss.server.flush.thread.alive 10
 rss.server.buffer.capacity 40g
 rss.server.read.buffer.capacity 20g
-rss.server.heartbeat.timeout 60000
 rss.server.heartbeat.interval 10000
 rss.rpc.message.max.size 1073741824
 rss.server.preAllocation.expired 120000
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 9a450ca5..566d99e9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -375,7 +375,6 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
     RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
     ShuffleAssignmentsInfo response = null;
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
-    int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000);
     int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
     Thread.sleep(heartbeatInterval * 2);
     shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
@@ -410,7 +409,7 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
         );
       });
       return shuffleAssignments;
-    }, heartbeatTimeout, maxTryTime);
+    }, heartbeatInterval, maxTryTime);
 
     assertNotNull(response);
   }
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index ddb7d53a..893b9c55 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -51,13 +51,11 @@ public class RegisterHeartBeat {
   private final ScheduledExecutorService service =
       ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat");
   private final ExecutorService heartBeatExecutorService;
-  private long heartBeatTimeout;
 
   public RegisterHeartBeat(ShuffleServer shuffleServer) {
     ShuffleServerConf conf = shuffleServer.getShuffleServerConf();
     this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
     this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
-    this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
     this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
     CoordinatorClientFactory factory =
         new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
@@ -107,6 +105,7 @@ public class RegisterHeartBeat {
       Map<String, StorageInfo> localStorageInfo,
       int nettyPort) {
     boolean sendSuccessfully = false;
+    // use `rss.server.heartbeat.interval` as the timeout option
     RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
         id,
         ip,
@@ -115,7 +114,7 @@ public class RegisterHeartBeat {
         preAllocatedMemory,
         availableMemory,
         eventNumInFlush,
-        heartBeatTimeout,
+        heartBeatInterval,
         tags,
         isHealthy,
         serverStatus,
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a34c0138..ecd11609 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -75,12 +75,6 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(10 * 1000L)
       .withDescription("Heartbeat interval to Coordinator (ms)");
 
-  public static final ConfigOption<Long> SERVER_HEARTBEAT_TIMEOUT = ConfigOptions
-      .key("rss.server.heartbeat.timeout")
-      .longType()
-      .defaultValue(60 * 1000L)
-      .withDescription("rss heartbeat interval ms");
-
   public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
       .key("rss.server.flush.threadPool.size")
       .intType()
diff --git a/server/src/test/resources/server.conf b/server/src/test/resources/server.conf
index ca7a47bf..585da760 100644
--- a/server/src/test/resources/server.conf
+++ b/server/src/test/resources/server.conf
@@ -24,6 +24,5 @@ rss.server.buffer.spill.threshold 130
 rss.server.partition.buffer.size 128
 rss.jetty.http.port 12345
 rss.jetty.corePool.size 64
-rss.server.heartbeat.timeout 1
 rss.server.write.timeout 2000
 rss.server.shuffleBufferManager.trigger.flush.interval 500