You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/16 04:03:51 UTC
[incubator-celeborn] branch main updated: [CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bb967004 [CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)
bb967004 is described below
commit bb96700415cbc34186d162757e9215a71b4ab27c
Author: zy.jordan <82...@users.noreply.github.com>
AuthorDate: Mon Jan 16 12:03:46 2023 +0800
[CELEBORN-223] The default rpc thread num of pushServer/replicateServer/fetchServer should be the number of total of Flusher's thread (#1163)
---
.../org/apache/celeborn/common/CelebornConf.scala | 27 ++++++++++++----------
docs/configuration/worker.md | 6 ++---
.../celeborn/service/deploy/worker/Worker.scala | 6 ++---
.../deploy/worker/storage/StorageManager.scala | 26 +++++++++++++--------
4 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 497a00a8..668e0155 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -490,9 +490,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
- def workerPushIoThreads: Int = get(WORKER_PUSH_IO_THREADS)
- def workerFetchIoThreads: Int = get(WORKER_FETCH_IO_THREADS)
- def workerReplicateIoThreads: Int = get(WORKER_REPLICATE_IO_THREADS)
+
+ def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
+
+ def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
+
+ def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerNonEmptyDirExpireDuration: Long = get(WORKER_NON_EMPTY_DIR_EXPIRE_DURATION)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
@@ -1755,35 +1758,35 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)
- val WORKER_PUSH_IO_THREADS: ConfigEntry[Int] =
+ val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.push.io.threads")
.withAlternative("rss.push.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client push data. " +
- s"The default threads number is 16.")
+ s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
- .createWithDefault(16)
+ .createOptional
- val WORKER_FETCH_IO_THREADS: ConfigEntry[Int] =
+ val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.fetch.io.threads")
.withAlternative("rss.fetch.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client fetch data. " +
- s"The default threads number is 16.")
+ s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
- .createWithDefault(16)
+ .createOptional
- val WORKER_REPLICATE_IO_THREADS: ConfigEntry[Int] =
+ val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.replicate.io.threads")
.withAlternative("rss.replicate.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to replicate shuffle data. " +
- s"The default threads number is 16.")
+ s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
- .createWithDefault(16)
+ .createOptional
val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.register.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index eb3c1caf..33e24112 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,7 +42,7 @@ license: |
| celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
| celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master. | 0.2.0 |
| celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for each disk. | 0.2.0 |
-| celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to handle client fetch data. The default threads number is 16. | 0.2.0 |
+| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 |
| celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.flusher.avgFlushTime.slidingWindow.size | 20 | The size of sliding windows used to calculate statistics about flushed time and count. | 0.2.0 |
| celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single flusher. | 0.2.0 |
@@ -72,11 +72,11 @@ license: |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 |
| celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 |
-| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 |
+| celeborn.worker.push.io.threads | <undefined> | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 |
| celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request not replied during the duration, worker will mark the replicate data request as failed. | 0.2.0 |
-| celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker to replicate shuffle data. The default threads number is 16. | 0.2.0 |
+| celeborn.worker.replicate.io.threads | <undefined> | Netty IO thread number of worker to replicate shuffle data. The default threads number is the number of flush thread. | 0.2.0 |
| celeborn.worker.replicate.port | 0 | Server port for Worker to receive replicate data request from other Workers. | 0.2.0 |
| celeborn.worker.replicate.threads | 64 | Thread number of worker to replicate shuffle data. | 0.2.0 |
| celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC request. | 0.2.0 |
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 046560ce..1b0e6cfd 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -110,7 +110,7 @@ private[celeborn] class Worker(
val pushDataHandler = new PushDataHandler()
val (pushServer, pushClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads = conf.workerPushIoThreads
+ val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE, numThreads)
val pushServerLimiter = new ChannelsLimiter(TransportModuleConstants.PUSH_MODULE)
@@ -125,7 +125,7 @@ private[celeborn] class Worker(
private val replicateServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
- conf.workerReplicateIoThreads
+ conf.workerReplicateIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE, numThreads)
val replicateLimiter = new ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE)
@@ -137,7 +137,7 @@ private[celeborn] class Worker(
var fetchHandler: FetchHandler = _
private val fetchServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads = conf.workerFetchIoThreads
+ val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
fetchHandler = new FetchHandler(transportConf)
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 91265f79..fb8e1b3a 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -95,8 +95,11 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, workerSource)
// (mountPoint -> LocalFlusher)
- private val localFlushers: ConcurrentHashMap[String, LocalFlusher] = {
+ private val (
+ localFlushers: ConcurrentHashMap[String, LocalFlusher],
+ _totalLocalFlusherThread: Int) = {
val flushers = new ConcurrentHashMap[String, LocalFlusher]()
+ var totalThread = 0;
disksSnapshot().foreach { diskInfo =>
if (!flushers.containsKey(diskInfo.mountPoint)) {
val flusher = new LocalFlusher(
@@ -108,9 +111,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
conf.avgFlushTimeSlidingWindowMinCount,
diskInfo.storageType)
flushers.put(diskInfo.mountPoint, flusher)
+ totalThread = totalThread + diskInfo.threadCount
}
}
- flushers
+ (flushers, totalThread)
}
private val actionService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
@@ -124,7 +128,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
val hdfsPermission = FsPermission.createImmutable(755)
val hdfsWriters = new util.ArrayList[FileWriter]()
- val hdfsFlusher =
+ val (hdfsFlusher, _totalHdfsFlusherThread) =
if (!hdfsDir.isEmpty) {
val hdfsConfiguration = new Configuration
hdfsConfiguration.set("fs.defaultFS", hdfsDir)
@@ -133,15 +137,19 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
logInfo("Celeborn will ignore cluster settings" +
" about fs.hdfs.impl.disable.cache and set it to false")
StorageManager.hadoopFs = FileSystem.get(hdfsConfiguration)
- Some(new HdfsFlusher(
- workerSource,
- conf.hdfsFlusherThreads,
- conf.avgFlushTimeSlidingWindowSize,
- conf.avgFlushTimeSlidingWindowMinCount))
+ (
+ Some(new HdfsFlusher(
+ workerSource,
+ conf.hdfsFlusherThreads,
+ conf.avgFlushTimeSlidingWindowSize,
+ conf.avgFlushTimeSlidingWindowMinCount)),
+ conf.hdfsFlusherThreads)
} else {
- None
+ (None, 0)
}
+ def totalFlusherThread: Int = _totalLocalFlusherThread + _totalHdfsFlusherThread
+
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
if (diskStatus == DiskStatus.CRITICAL_ERROR) {
logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk operator.")