You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/30 03:57:12 UTC
[incubator-celeborn] branch main updated: [CELEBORN-241][IMPROVEMENT] limit inflight push timeout should > push data timeout (#1179)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 122da478 [CELEBORN-241][IMPROVEMENT] limit inflight push timeout should > push data timeout (#1179)
122da478 is described below
commit 122da47815c00f3de585d35cc8e5d47b8bb57421
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Jan 30 11:57:07 2023 +0800
[CELEBORN-241][IMPROVEMENT] limit inflight push timeout should > push data timeout (#1179)
---
.../org/apache/celeborn/common/CelebornConf.scala | 49 +++++++++++-----------
docs/configuration/client.md | 2 +-
2 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 816a7679..0c04789e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.identity.{DefaultIdentityProvider, UserIdentif
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
import org.apache.celeborn.common.network.util.ByteUnit
-import org.apache.celeborn.common.protocol.{CompressionCodec, PartitionSplitMode, PartitionType, ShuffleMode, SlotsAssignPolicy}
+import org.apache.celeborn.common.protocol.{CompressionCodec, PartitionSplitMode, PartitionType, ShuffleMode, SlotsAssignPolicy, TransportModuleConstants}
import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.common.protocol.StorageInfo.Type.{HDD, SSD}
import org.apache.celeborn.common.quota.DefaultQuotaManager
@@ -665,7 +665,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def pushRetryThreads: Int = get(PUSH_RETRY_THREADS)
def pushStageEndTimeout: Long =
get(PUSH_STAGE_END_TIMEOUT).getOrElse(get(RPC_ASK_TIMEOUT) * (requestCommitFilesMaxRetries + 1))
- def pushLimitInFlightTimeoutMs: Long = get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT)
+ def pushLimitInFlightTimeoutMs: Long =
+ get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(pushDataTimeoutMs * 2)
def pushLimitInFlightSleepDeltaMs: Long = get(PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
def pushSplitPartitionThreads: Int = get(PUSH_SPLIT_PARTITION_THREADS)
def partitionSplitMode: PartitionSplitMode = PartitionSplitMode.valueOf(get(PARTITION_SPLIT_MODE))
@@ -679,8 +680,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def rpcCacheSize: Int = get(RPC_CACHE_SIZE)
def rpcCacheConcurrencyLevel: Int = get(RPC_CACHE_CONCURRENCY_LEVEL)
def rpcCacheExpireTime: Long = get(RPC_CACHE_EXPIRE_TIME)
- def pushDataTimeoutMs = get(PUSH_DATA_TIMEOUT)
-
+ def pushDataTimeoutMs: Long = get(PUSH_DATA_TIMEOUT)
def registerShuffleRpcAskTimeout: RpcTimeout =
new RpcTimeout(
get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).map(_.milli)
@@ -2119,14 +2119,32 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
- val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: ConfigEntry[Long] =
+ val PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.push.data.timeout")
+ .withAlternative("rss.push.data.rpc.timeout")
+ .categories("client")
+ .version("0.2.0")
+ .doc("Timeout for a task to push data rpc message.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("120s")
+
+ val TEST_PUSHDATA_TIMEOUT: ConfigEntry[Boolean] =
+ buildConf("celeborn.test.pushdataTimeout")
+ .categories("worker")
+ .version("0.2.0")
+ .doc("Wheter to test pushdata timeout")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("celeborn.push.limit.inFlight.timeout")
.withAlternative("rss.limit.inflight.timeout")
.categories("client")
- .doc("Timeout for netty in-flight requests to be done.")
+ .doc("Timeout for netty in-flight requests to be done." +
+ s"Default value should be `${PUSH_DATA_TIMEOUT.key} * 2`.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("240s")
+ .createOptional
val PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.push.limit.inFlight.sleepInterval")
@@ -2247,23 +2265,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
- val PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.push.data.timeout")
- .withAlternative("rss.push.data.rpc.timeout")
- .categories("client")
- .version("0.2.0")
- .doc("Timeout for a task to push data rpc message.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("120s")
-
- val TEST_PUSHDATA_TIMEOUT: ConfigEntry[Boolean] =
- buildConf("celeborn.test.pushdataTimeout")
- .categories("worker")
- .version("0.2.0")
- .doc("Wheter to test pushdata timeout")
- .booleanConf
- .createWithDefault(false)
-
val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("celeborn.rpc.registerShuffle.askTimeout")
.categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index e3cc0fe6..c880d562 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -33,7 +33,7 @@ license: |
| celeborn.push.data.slowStart | false | Whether to allow to slow increasing maxReqs to meet the max push capacity, worked when worker side enables rate limit mechanism | 0.3.0 |
| celeborn.push.data.timeout | 120s | Timeout for a task to push data rpc message. | 0.2.0 |
| celeborn.push.limit.inFlight.sleepInterval | 50ms | Sleep interval when check netty in-flight requests to be done. | 0.2.0 |
-| celeborn.push.limit.inFlight.timeout | 240s | Timeout for netty in-flight requests to be done. | 0.2.0 |
+| celeborn.push.limit.inFlight.timeout | <undefined> | Timeout for netty in-flight requests to be done.Default value should be `celeborn.push.data.timeout * 2`. | 0.2.0 |
| celeborn.push.maxReqsInFlight | 4 | Amount of Netty in-flight requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib | 0.2.0 |
| celeborn.push.queue.capacity | 512 | Push buffer queue size for a task. The maximum memory is `celeborn.push.buffer.max.size` * `celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.2.0 |
| celeborn.push.replicate.enabled | true | When true, Celeborn worker will replicate shuffle data to another Celeborn worker asynchronously to ensure the pushed shuffle data won't be lost after the node failure. | 0.2.0 |