You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/10 09:47:01 UTC

[incubator-celeborn] branch main updated: [CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 19197b91 [CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
19197b91 is described below

commit 19197b919092541477aa1e86c6dd0273ddff82d8
Author: zy.jordan <82...@users.noreply.github.com>
AuthorDate: Tue Jan 10 17:46:56 2023 +0800

    [CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 24 +++++++++++-----------
 docs/configuration/worker.md                       |  6 +++---
 .../celeborn/service/deploy/worker/Worker.scala    |  6 +++---
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index aeff72d4..a8b46bc7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -490,9 +490,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def workerPushPort: Int = get(WORKER_PUSH_PORT)
   def workerFetchPort: Int = get(WORKER_FETCH_PORT)
   def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
-  def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
-  def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
-  def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
+  def workerPushIoThreads: Int = get(WORKER_PUSH_IO_THREADS)
+  def workerFetchIoThreads: Int = get(WORKER_FETCH_IO_THREADS)
+  def workerReplicateIoThreads: Int = get(WORKER_REPLICATE_IO_THREADS)
   def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
   def workerNonEmptyDirExpireDuration: Long = get(WORKER_NON_EMPTY_DIR_EXPIRE_DURATION)
   def workerWorkingDir: String = get(WORKER_WORKING_DIR)
@@ -1750,35 +1750,35 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(0)
 
-  val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
+  val WORKER_PUSH_IO_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.push.io.threads")
       .withAlternative("rss.push.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to handle client push data. " +
-        s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+        s"The default threads number is 16.")
       .version("0.2.0")
       .intConf
-      .createOptional
+      .createWithDefault(16)
 
-  val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
+  val WORKER_FETCH_IO_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.fetch.io.threads")
       .withAlternative("rss.fetch.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to handle client fetch data. " +
-        s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+        s"The default threads number is 16.")
       .version("0.2.0")
       .intConf
-      .createOptional
+      .createWithDefault(16)
 
-  val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
+  val WORKER_REPLICATE_IO_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.replicate.io.threads")
       .withAlternative("rss.replicate.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to replicate shuffle data. " +
-        s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+        s"The default threads number is 16.")
       .version("0.2.0")
       .intConf
-      .createOptional
+      .createWithDefault(16)
 
   val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.register.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index d9df2184..8548c44b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,7 +42,7 @@ license: |
 | celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 | 
 | celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 | 
 | celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.2.0 | 
-| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to handle client fetch data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 | 
+| celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to handle client fetch data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.flusher.avgFlushTime.slidingWindow.size | 20 | The size of sliding windows used to calculate statistics about flushed time and count. | 0.2.0 | 
 | celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single flusher. | 0.2.0 | 
@@ -71,11 +71,11 @@ license: |
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
 | celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 | 
-| celeborn.worker.push.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to handle client push data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 | 
+| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 | 
 | celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 | 
-| celeborn.worker.replicate.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to replicate shuffle data. The default threads number is `size(celeborn.worker.storage.dirs)*2`. | 0.2.0 | 
+| celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker to replicate shuffle data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | 
 | celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 | 
 | celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 | 
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index debdd390..046560ce 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -110,7 +110,7 @@ private[celeborn] class Worker(
   val pushDataHandler = new PushDataHandler()
   val (pushServer, pushClientFactory) = {
     val closeIdleConnections = conf.workerCloseIdleConnections
-    val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+    val numThreads = conf.workerPushIoThreads
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE, numThreads)
     val pushServerLimiter = new ChannelsLimiter(TransportModuleConstants.PUSH_MODULE)
@@ -125,7 +125,7 @@ private[celeborn] class Worker(
   private val replicateServer = {
     val closeIdleConnections = conf.workerCloseIdleConnections
     val numThreads =
-      conf.workerReplicateIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+      conf.workerReplicateIoThreads
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE, numThreads)
     val replicateLimiter = new ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE)
@@ -137,7 +137,7 @@ private[celeborn] class Worker(
   var fetchHandler: FetchHandler = _
   private val fetchServer = {
     val closeIdleConnections = conf.workerCloseIdleConnections
-    val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+    val numThreads = conf.workerFetchIoThreads
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
     fetchHandler = new FetchHandler(transportConf)