You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/10 11:13:39 UTC

[incubator-celeborn] branch main updated: [CELEBORN-213] Add configuration for whether to close idle connections in client side (#1157)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1332362b [CELEBORN-213] Add  configuration for whether to close idle connections in client side (#1157)
1332362b is described below

commit 1332362bffcf276c53015fe9a036ecf570473e80
Author: Shuang <lv...@gmail.com>
AuthorDate: Tue Jan 10 19:13:33 2023 +0800

    [CELEBORN-213] Add  configuration for whether to close idle connections in client side (#1157)
---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java  | 3 ++-
 .../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
 docs/configuration/client.md                                     | 1 +
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d7bc1700..7fc438a4 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -159,7 +159,8 @@ public class ShuffleClientImpl extends ShuffleClient {
     TransportConf dataTransportConf =
         Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module + ".io.threads", 8));
     TransportContext context =
-        new TransportContext(dataTransportConf, new BaseMessageHandler(), true);
+        new TransportContext(
+            dataTransportConf, new BaseMessageHandler(), conf.clientCloseIdleConnections());
     dataClientFactory = context.createClientFactory();
 
     int pushDataRetryThreads = conf.pushRetryThreads();
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a8b46bc7..4e45b8f3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,6 +514,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   //                      Client                         //
   // //////////////////////////////////////////////////////
   def clientMaxTries: Int = get(CLIENT_MAX_RETRIES)
+  def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
   def shuffleWriterMode: ShuffleMode = ShuffleMode.valueOf(get(SHUFFLE_WRITER_MODE))
   def shuffleForceFallbackEnabled: Boolean = get(SHUFFLE_FORCE_FALLBACK_ENABLED)
   def shuffleForceFallbackPartitionThreshold: Long = get(SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
@@ -2301,6 +2302,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(15)
 
+  val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.closeIdleConnections")
+      .categories("client")
+      .doc("Whether client will close idle connections.")
+      .version("0.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val METRICS_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.metrics.enabled")
       .withAlternative("rss.metrics.system.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 9e2692c2..212b51e5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,7 @@ license: |
 | --- | ------- | ----------- | ----- |
 | celeborn.application.heartbeatInterval | 10s | Interval for client to send heartbeat message to master. | 0.2.0 | 
 | celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add partition's peer worker into blacklist when push data to slave failed. | 0.3.0 | 
+| celeborn.client.closeIdleConnections | true | Whether client will close idle connections. | 0.3.0 | 
 | celeborn.client.maxRetries | 15 | Max retry times for client to connect master endpoint | 0.2.0 | 
 | celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. | 0.2.0 | 
 | celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 |