You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/30 03:57:12 UTC

[incubator-celeborn] branch main updated: [CELEBORN-241][IMPROVEMENT] limit inflight push timeout should > push data timeout (#1179)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 122da478 [CELEBORN-241][IMPROVEMENT] limit inflight push timeout should >  push data timeout (#1179)
122da478 is described below

commit 122da47815c00f3de585d35cc8e5d47b8bb57421
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 30 11:57:07 2023 +0800

    [CELEBORN-241][IMPROVEMENT] limit inflight push timeout should >  push data timeout (#1179)
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 49 +++++++++++-----------
 docs/configuration/client.md                       |  2 +-
 2 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 816a7679..0c04789e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.identity.{DefaultIdentityProvider, UserIdentif
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.internal.config._
 import org.apache.celeborn.common.network.util.ByteUnit
-import org.apache.celeborn.common.protocol.{CompressionCodec, PartitionSplitMode, PartitionType, ShuffleMode, SlotsAssignPolicy}
+import org.apache.celeborn.common.protocol.{CompressionCodec, PartitionSplitMode, PartitionType, ShuffleMode, SlotsAssignPolicy, TransportModuleConstants}
 import org.apache.celeborn.common.protocol.StorageInfo.Type
 import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
 import org.apache.celeborn.common.quota.DefaultQuotaManager
@@ -665,7 +665,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def pushRetryThreads: Int = get(PUSH_RETRY_THREADS)
   def pushStageEndTimeout: Long =
     get(PUSH_STAGE_END_TIMEOUT).getOrElse(get(RPC_ASK_TIMEOUT) * (requestCommitFilesMaxRetries + 1))
-  def pushLimitInFlightTimeoutMs: Long = get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT)
+  def pushLimitInFlightTimeoutMs: Long =
+    get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(pushDataTimeoutMs * 2)
   def pushLimitInFlightSleepDeltaMs: Long = get(PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
   def pushSplitPartitionThreads: Int = get(PUSH_SPLIT_PARTITION_THREADS)
   def partitionSplitMode: PartitionSplitMode = PartitionSplitMode.valueOf(get(PARTITION_SPLIT_MODE))
@@ -679,8 +680,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def rpcCacheSize: Int = get(RPC_CACHE_SIZE)
   def rpcCacheConcurrencyLevel: Int = get(RPC_CACHE_CONCURRENCY_LEVEL)
   def rpcCacheExpireTime: Long = get(RPC_CACHE_EXPIRE_TIME)
-  def pushDataTimeoutMs = get(PUSH_DATA_TIMEOUT)
-
+  def pushDataTimeoutMs: Long = get(PUSH_DATA_TIMEOUT)
   def registerShuffleRpcAskTimeout: RpcTimeout =
     new RpcTimeout(
       get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).map(_.milli)
@@ -2119,14 +2119,32 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
-  val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: ConfigEntry[Long] =
+  val PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.push.data.timeout")
+      .withAlternative("rss.push.data.rpc.timeout")
+      .categories("client")
+      .version("0.2.0")
+      .doc("Timeout for a task to push data rpc message.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("120s")
+
+  val TEST_PUSHDATA_TIMEOUT: ConfigEntry[Boolean] =
+    buildConf("celeborn.test.pushdataTimeout")
+      .categories("worker")
+      .version("0.2.0")
+      .doc("Wheter to test pushdata timeout")
+      .booleanConf
+      .createWithDefault(false)
+
+  val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: OptionalConfigEntry[Long] =
     buildConf("celeborn.push.limit.inFlight.timeout")
       .withAlternative("rss.limit.inflight.timeout")
       .categories("client")
-      .doc("Timeout for netty in-flight requests to be done.")
+      .doc("Timeout for netty in-flight requests to be done." +
+        s"Default value should be `${PUSH_DATA_TIMEOUT.key} * 2`.")
       .version("0.2.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("240s")
+      .createOptional
 
   val PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.push.limit.inFlight.sleepInterval")
@@ -2247,23 +2265,6 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("5s")
 
-  val PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.push.data.timeout")
-      .withAlternative("rss.push.data.rpc.timeout")
-      .categories("client")
-      .version("0.2.0")
-      .doc("Timeout for a task to push data rpc message.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("120s")
-
-  val TEST_PUSHDATA_TIMEOUT: ConfigEntry[Boolean] =
-    buildConf("celeborn.test.pushdataTimeout")
-      .categories("worker")
-      .version("0.2.0")
-      .doc("Wheter to test pushdata timeout")
-      .booleanConf
-      .createWithDefault(false)
-
   val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
     buildConf("celeborn.rpc.registerShuffle.askTimeout")
       .categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index e3cc0fe6..c880d562 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -33,7 +33,7 @@ license: |
 | celeborn.push.data.slowStart | false | Whether to allow to slow increasing maxReqs to meet the max push capacity, worked when worker side enables rate limit mechanism | 0.3.0 | 
 | celeborn.push.data.timeout | 120s | Timeout for a task to push data rpc message. | 0.2.0 | 
 | celeborn.push.limit.inFlight.sleepInterval | 50ms | Sleep interval when check netty in-flight requests to be done. | 0.2.0 | 
-| celeborn.push.limit.inFlight.timeout | 240s | Timeout for netty in-flight requests to be done. | 0.2.0 | 
+| celeborn.push.limit.inFlight.timeout | &lt;undefined&gt; | Timeout for netty in-flight requests to be done.Default value should be `celeborn.push.data.timeout * 2`. | 0.2.0 | 
 | celeborn.push.maxReqsInFlight | 4 | Amount of Netty in-flight requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib | 0.2.0 | 
 | celeborn.push.queue.capacity | 512 | Push buffer queue size for a task. The maximum memory is `celeborn.push.buffer.max.size` * `celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.2.0 | 
 | celeborn.push.replicate.enabled | true | When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. | 0.2.0 |