You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:20 UTC

[incubator-celeborn] 05/42: [CELEBORN-399] Make fileSorterExecutors thread num can be customized (#1325)

This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 29e58df2de88b4ace6d1bcdb528f3889ac158ee6
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Mar 10 21:10:43 2023 +0800

    [CELEBORN-399] Make fileSorterExecutors thread num can be customized (#1325)
---
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala   | 10 ++++++++++
 docs/configuration/worker.md                                   |  1 +
 .../service/deploy/worker/storage/PartitionFilesSorter.java    | 10 +++++-----
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 1520d5e31..3b094aaff 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,6 +514,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
   def partitionSorterReservedMemoryPerPartition: Long =
     get(PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
+  def partitionSorterThreads: Int =
+    get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
 
   // //////////////////////////////////////////////////////
   //                      Client                         //
@@ -1957,6 +1959,14 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1mb")
 
+  val PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
+    buildConf("celeborn.worker.partitionSorter.threads")
+      .categories("worker")
+      .doc("PartitionSorter's thread counts.")
+      .version("0.3.0")
+      .intConf
+      .createOptional
+
   val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.flusher.buffer.size")
       .withAlternative("rss.worker.flush.buffer.size")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index aebc66fdd..14007688d 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -81,6 +81,7 @@ license: |
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
 | celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 | 
+| celeborn.worker.partitionSorter.threads | &lt;undefined&gt; | PartitionSorter's thread counts. | 0.3.0 | 
 | celeborn.worker.push.io.threads | &lt;undefined&gt; | Netty IO thread number of worker to handle client push data. The default threads number is the number of flush thread. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 | 
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 28c2e9ffb..cb2670e54 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -91,11 +91,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
 
   protected final AbstractSource source;
 
-  private final ExecutorService fileSorterExecutors =
-      ThreadUtils.newDaemonCachedThreadPool(
-          "worker-file-sorter-execute",
-          Math.max(Runtime.getRuntime().availableProcessors(), 8),
-          120);
+  private final ExecutorService fileSorterExecutors;
   private final Thread fileSorterSchedulerThread;
 
   public PartitionFilesSorter(
@@ -123,6 +119,10 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
       this.sortedFilesDb = null;
     }
 
+    fileSorterExecutors =
+        ThreadUtils.newDaemonCachedThreadPool(
+            "worker-file-sorter-execute", conf.partitionSorterThreads(), 120);
+
     fileSorterSchedulerThread =
         new Thread(
             () -> {