You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/16 04:03:51 UTC

[incubator-celeborn] branch main updated: [CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new bb967004 [CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)
bb967004 is described below

commit bb96700415cbc34186d162757e9215a71b4ab27c
Author: zy.jordan <82...@users.noreply.github.com>
AuthorDate: Mon Jan 16 12:03:46 2023 +0800

    [CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 27 ++++++++++++----------
 docs/configuration/worker.md                       |  6 ++---
 .../celeborn/service/deploy/worker/Worker.scala    |  6 ++---
 .../deploy/worker/storage/StorageManager.scala     | 26 +++++++++++++--------
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 497a00a8..668e0155 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -490,9 +490,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def workerPushPort: Int = get(WORKER_PUSH_PORT)
   def workerFetchPort: Int = get(WORKER_FETCH_PORT)
   def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
-  def workerPushIoThreads: Int = get(WORKER_PUSH_IO_THREADS)
-  def workerFetchIoThreads: Int = get(WORKER_FETCH_IO_THREADS)
-  def workerReplicateIoThreads: Int = get(WORKER_REPLICATE_IO_THREADS)
+
+  def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
+
+  def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
+
+  def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
   def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
   def workerNonEmptyDirExpireDuration: Long = get(WORKER_NON_EMPTY_DIR_EXPIRE_DURATION)
   def workerWorkingDir: String = get(WORKER_WORKING_DIR)
@@ -1755,35 +1758,35 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(0)
 
-  val WORKER_PUSH_IO_THREADS: ConfigEntry[Int] =
+  val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
     buildConf("celeborn.worker.push.io.threads")
       .withAlternative("rss.push.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to handle client push data. " +
-        s"The default threads number is 16.")
+        s"The default threads number is the number of flush thread.")
       .version("0.2.0")
       .intConf
-      .createWithDefault(16)
+      .createOptional
 
-  val WORKER_FETCH_IO_THREADS: ConfigEntry[Int] =
+  val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
     buildConf("celeborn.worker.fetch.io.threads")
       .withAlternative("rss.fetch.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to handle client fetch data. " +
-        s"The default threads number is 16.")
+        s"The default threads number is the number of flush thread.")
       .version("0.2.0")
       .intConf
-      .createWithDefault(16)
+      .createOptional
 
-  val WORKER_REPLICATE_IO_THREADS: ConfigEntry[Int] =
+  val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
     buildConf("celeborn.worker.replicate.io.threads")
       .withAlternative("rss.replicate.io.threads")
       .categories("worker")
       .doc("Netty IO thread number of worker to replicate shuffle data. " +
-        s"The default threads number is 16.")
+        s"The default threads number is the number of flush thread.")
       .version("0.2.0")
       .intConf
-      .createWithDefault(16)
+      .createOptional
 
   val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.register.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index eb3c1caf..33e24112 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,7 +42,7 @@ license: |
 | celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 | 
 | celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 | 
 | celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.2.0 | 
-| celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to handle client fetch data. The default threads number is 16. | 0.2.0 | 
+| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.flusher.avgFlushTime.slidingWindow.size | 20 | The size of sliding windows used to calculate statistics about flushed time and count. | 0.2.0 | 
 | celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single flusher. | 0.2.0 | 
@@ -72,11 +72,11 @@ license: |
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
 | celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 | 
-| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 | 
+| celeborn.worker.push.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 | 
 | celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 | 
-| celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker to replicate shuffle data. The default threads number is 16. | 0.2.0 | 
+| celeborn.worker.replicate.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 | 
 | celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 | 
 | celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 | 
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 046560ce..1b0e6cfd 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -110,7 +110,7 @@ private[celeborn] class Worker(
   val pushDataHandler = new PushDataHandler()
   val (pushServer, pushClientFactory) = {
     val closeIdleConnections = conf.workerCloseIdleConnections
-    val numThreads = conf.workerPushIoThreads
+    val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE, numThreads)
     val pushServerLimiter = new ChannelsLimiter(TransportModuleConstants.PUSH_MODULE)
@@ -125,7 +125,7 @@ private[celeborn] class Worker(
   private val replicateServer = {
     val closeIdleConnections = conf.workerCloseIdleConnections
     val numThreads =
-      conf.workerReplicateIoThreads
+      conf.workerReplicateIoThreads.getOrElse(storageManager.totalFlusherThread)
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE, numThreads)
     val replicateLimiter = new ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE)
@@ -137,7 +137,7 @@ private[celeborn] class Worker(
   var fetchHandler: FetchHandler = _
   private val fetchServer = {
     val closeIdleConnections = conf.workerCloseIdleConnections
-    val numThreads = conf.workerFetchIoThreads
+    val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
     val transportConf =
       Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
     fetchHandler = new FetchHandler(transportConf)
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 91265f79..fb8e1b3a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -95,8 +95,11 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
     DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, workerSource)
 
   // (mountPoint -> LocalFlusher)
-  private val localFlushers: ConcurrentHashMap[String, LocalFlusher] = {
+  private val (
+    localFlushers: ConcurrentHashMap[String, LocalFlusher],
+    _totalLocalFlusherThread: Int) = {
     val flushers = new ConcurrentHashMap[String, LocalFlusher]()
+    var totalThread = 0;
     disksSnapshot().foreach { diskInfo =>
       if (!flushers.containsKey(diskInfo.mountPoint)) {
         val flusher = new LocalFlusher(
@@ -108,9 +111,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
           conf.avgFlushTimeSlidingWindowMinCount,
           diskInfo.storageType)
         flushers.put(diskInfo.mountPoint, flusher)
+        totalThread = totalThread + diskInfo.threadCount
       }
     }
-    flushers
+    (flushers, totalThread)
   }
 
   private val actionService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
@@ -124,7 +128,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
   }
   val hdfsPermission = FsPermission.createImmutable(755)
   val hdfsWriters = new util.ArrayList[FileWriter]()
-  val hdfsFlusher =
+  val (hdfsFlusher, _totalHdfsFlusherThread) =
     if (!hdfsDir.isEmpty) {
       val hdfsConfiguration = new Configuration
       hdfsConfiguration.set("fs.defaultFS", hdfsDir)
@@ -133,15 +137,19 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
       logInfo("Celeborn will ignore cluster settings" +
         " about fs.hdfs.impl.disable.cache and set it to false")
       StorageManager.hadoopFs = FileSystem.get(hdfsConfiguration)
-      Some(new HdfsFlusher(
-        workerSource,
-        conf.hdfsFlusherThreads,
-        conf.avgFlushTimeSlidingWindowSize,
-        conf.avgFlushTimeSlidingWindowMinCount))
+      (
+        Some(new HdfsFlusher(
+          workerSource,
+          conf.hdfsFlusherThreads,
+          conf.avgFlushTimeSlidingWindowSize,
+          conf.avgFlushTimeSlidingWindowMinCount)),
+        conf.hdfsFlusherThreads)
     } else {
-      None
+      (None, 0)
     }
 
+  def totalFlusherThread: Int = _totalLocalFlusherThread + _totalHdfsFlusherThread
+
   override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
     if (diskStatus == DiskStatus.CRITICAL_ERROR) {
       logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk operator.")