You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/10 09:47:01 UTC
[incubator-celeborn] branch main updated: [CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 19197b91 [CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
19197b91 is described below
commit 19197b919092541477aa1e86c6dd0273ddff82d8
Author: zy.jordan <82...@users.noreply.github.com>
AuthorDate: Tue Jan 10 17:46:56 2023 +0800
[CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
---
.../org/apache/celeborn/common/CelebornConf.scala | 24 +++++++++++-----------
docs/configuration/worker.md | 6 +++---
.../celeborn/service/deploy/worker/Worker.scala | 6 +++---
3 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index aeff72d4..a8b46bc7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -490,9 +490,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
- def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
- def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
- def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
+ def workerPushIoThreads: Int = get(WORKER_PUSH_IO_THREADS)
+ def workerFetchIoThreads: Int = get(WORKER_FETCH_IO_THREADS)
+ def workerReplicateIoThreads: Int = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerNonEmptyDirExpireDuration: Long = get(WORKER_NON_EMPTY_DIR_EXPIRE_DURATION)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
@@ -1750,35 +1750,35 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)
- val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_PUSH_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.push.io.threads")
.withAlternative("rss.push.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client push data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
- val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_FETCH_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.fetch.io.threads")
.withAlternative("rss.fetch.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client fetch data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
- val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_REPLICATE_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.io.threads")
.withAlternative("rss.replicate.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to replicate shuffle data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.register.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index d9df2184..8548c44b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,7 +42,7 @@ license: |
| celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
| celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
| celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.2.0 |
-| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread number of worker to handle client fetch data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to handle client fetch data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.flusher.avgFlushTime.slidingWindow.size | 20 | The size of sliding windows used to calculate statistics about flushed time and count. | 0.2.0 |
| celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single flusher. | 0.2.0 |
@@ -71,11 +71,11 @@ license: |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 |
| celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 |
-| celeborn.worker.push.io.threads | <undefined> | Netty IO thread number of worker to handle client push data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 |
| celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 |
-| celeborn.worker.replicate.io.threads | <undefined> | Netty IO thread number of worker to replicate shuffle data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker to replicate shuffle data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 |
| celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 |
| celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 |
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index debdd390..046560ce 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -110,7 +110,7 @@ private[celeborn] class Worker(
val pushDataHandler = new PushDataHandler()
val (pushServer, pushClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ val numThreads = conf.workerPushIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE, numThreads)
val pushServerLimiter = new ChannelsLimiter(TransportModuleConstants.PUSH_MODULE)
@@ -125,7 +125,7 @@ private[celeborn] class Worker(
private val replicateServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
- conf.workerReplicateIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ conf.workerReplicateIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE, numThreads)
val replicateLimiter = new ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE)
@@ -137,7 +137,7 @@ private[celeborn] class Worker(
var fetchHandler: FetchHandler = _
private val fetchServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ val numThreads = conf.workerFetchIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
fetchHandler = new FetchHandler(transportConf)