You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/10 13:10:48 UTC
[incubator-celeborn] branch main updated: [CELEBORN-399] Make fileSorterExecutors thread num can be customized (#1325)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4b334df7a [CELEBORN-399] Make fileSorterExecutors thread num can be customized (#1325)
4b334df7a is described below
commit 4b334df7a6ee5072829a6b6c08c3c8c131a97c95
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Mar 10 21:10:43 2023 +0800
[CELEBORN-399] Make fileSorterExecutors thread num can be customized (#1325)
---
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/PartitionFilesSorter.java | 10 +++++-----
3 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 1520d5e31..3b094aaff 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,6 +514,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
+ def partitionSorterThreads: Int =
+ get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
// //////////////////////////////////////////////////////
// Client //
@@ -1957,6 +1959,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1mb")
+ val PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
+ buildConf("celeborn.worker.partitionSorter.threads")
+ .categories("worker")
+ .doc("PartitionSorter's thread counts.")
+ .version("0.3.0")
+ .intConf
+ .createOptional
+
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.withAlternative("rss.worker.flush.buffer.size")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index aebc66fdd..14007688d 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -81,6 +81,7 @@ license: |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.2.0 |
| celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 |
+| celeborn.worker.partitionSorter.threads | <undefined> | PartitionSorter's thread counts. | 0.3.0 |
| celeborn.worker.push.io.threads | <undefined> | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 |
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 28c2e9ffb..cb2670e54 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -91,11 +91,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
protected final AbstractSource source;
- private final ExecutorService fileSorterExecutors =
- ThreadUtils.newDaemonCachedThreadPool(
- "worker-file-sorter-execute",
- Math.max(Runtime.getRuntime().availableProcessors(), 8),
- 120);
+ private final ExecutorService fileSorterExecutors;
private final Thread fileSorterSchedulerThread;
public PartitionFilesSorter(
@@ -123,6 +119,10 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
this.sortedFilesDb = null;
}
+ fileSorterExecutors =
+ ThreadUtils.newDaemonCachedThreadPool(
+ "worker-file-sorter-execute", conf.partitionSorterThreads(), 120);
+
fileSorterSchedulerThread =
new Thread(
() -> {