You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/10 11:13:39 UTC
[incubator-celeborn] branch main updated: [CELEBORN-213] Add configuration for whether to close idle connections in client side (#1157)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1332362b [CELEBORN-213] Add configuration for whether to close idle connections in client side (#1157)
1332362b is described below
commit 1332362bffcf276c53015fe9a036ecf570473e80
Author: Shuang <lv...@gmail.com>
AuthorDate: Tue Jan 10 19:13:33 2023 +0800
[CELEBORN-213] Add configuration for whether to close idle connections in client side (#1157)
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 3 ++-
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/client.md | 1 +
3 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d7bc1700..7fc438a4 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -159,7 +159,8 @@ public class ShuffleClientImpl extends ShuffleClient {
TransportConf dataTransportConf =
Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module + ".io.threads", 8));
TransportContext context =
- new TransportContext(dataTransportConf, new BaseMessageHandler(), true);
+ new TransportContext(
+ dataTransportConf, new BaseMessageHandler(), conf.clientCloseIdleConnections());
dataClientFactory = context.createClientFactory();
int pushDataRetryThreads = conf.pushRetryThreads();
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a8b46bc7..4e45b8f3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,6 +514,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// Client //
// //////////////////////////////////////////////////////
def clientMaxTries: Int = get(CLIENT_MAX_RETRIES)
+ def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def shuffleWriterMode: ShuffleMode = ShuffleMode.valueOf(get(SHUFFLE_WRITER_MODE))
def shuffleForceFallbackEnabled: Boolean = get(SHUFFLE_FORCE_FALLBACK_ENABLED)
def shuffleForceFallbackPartitionThreshold: Long = get(SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
@@ -2301,6 +2302,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(15)
+ val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.closeIdleConnections")
+ .categories("client")
+ .doc("Whether client will close idle connections.")
+ .version("0.3.0")
+ .booleanConf
+ .createWithDefault(true)
+
val METRICS_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.enabled")
.withAlternative("rss.metrics.system.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 9e2692c2..212b51e5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,7 @@ license: |
| --- | ------- | ----------- | ----- |
| celeborn.application.heartbeatInterval | 10s | Interval for client to send heartbeat message to master. | 0.2.0 |
| celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add partition's peer worker into blacklist when push data to slave failed. | 0.3.0 |
+| celeborn.client.closeIdleConnections | true | Whether client will close idle connections. | 0.3.0 |
| celeborn.client.maxRetries | 15 | Max retry times for client to connect master endpoint | 0.2.0 |
| celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. | 0.2.0 |
| celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 |