You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/14 05:20:48 UTC

[incubator-celeborn] branch main updated: [CELEBORN-225] Add global default configuration for number of flusher… (#1165)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fa7ba431 [CELEBORN-225] Add global default configuration for number of flusher… (#1165)
fa7ba431 is described below

commit fa7ba4313619630d6e5b7eaa84b9875093d496af
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Sat Jan 14 13:20:44 2023 +0800

    [CELEBORN-225] Add global default configuration for number of flusher… (#1165)
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 26 ++++++++----
 .../apache/celeborn/common/CelebornConfSuite.scala | 49 +++++++++++++++++++---
 docs/configuration/worker.md                       |  1 +
 3 files changed, 64 insertions(+), 12 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 109cf20c..497a00a8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -757,11 +757,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
       storageDirs.map { str =>
         var maxCapacity = defaultMaxCapacity
         var diskType = HDD
-        var flushThread = -1
+        var flushThread = get(WORKER_FLUSHER_THREADS)
         val (dir, attributes) = str.split(":").toList match {
           case _dir :: tail => (_dir, tail)
           case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
         }
+        var flushThreadsDefined = false
         attributes.foreach {
           case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
             maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
@@ -770,17 +771,19 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
             if (diskType == Type.MEMORY) {
               throw new IOException(s"Invalid diskType: $diskType")
             }
+            if (!flushThreadsDefined) {
+              flushThread = diskType match {
+                case HDD => hddFlusherThreads
+                case SSD => ssdFlusherThreads
+                case _ => flushThread
+              }
+            }
           case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
             flushThread = threadCountStr.split("=")(1).toInt
+            flushThreadsDefined = true
           case illegal =>
             throw new IllegalArgumentException(s"Illegal attribute: $illegal")
         }
-        if (flushThread == -1) {
-          flushThread = diskType match {
-            case HDD => hddFlusherThreads
-            case SSD => ssdFlusherThreads
-          }
-        }
         (dir, maxCapacity, flushThread, diskType)
       }
     }.getOrElse {
@@ -1925,6 +1928,15 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("120s")
 
+  val WORKER_FLUSHER_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.worker.flusher.threads")
+      .withAlternative("rss.flusher.thread.count")
+      .categories("worker")
+      .doc("Flusher's thread count per disk for unkown-type disks.")
+      .version("0.2.0")
+      .intConf
+      .createWithDefault(2)
+
   val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.flusher.hdd.threads")
       .withAlternative("rss.flusher.hdd.thread.count")
diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index f84cbd29..0b84998d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -31,17 +31,17 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(masterEndpoints(1) == "localhost2:9097")
   }
 
-  test("basedir test") {
+  test("storage test") {
     val conf = new CelebornConf()
     val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
     conf.set("celeborn.worker.storage.dirs", "/mnt/disk1")
     val workerBaseDirs = conf.workerBaseDirs
     assert(workerBaseDirs.size == 1)
-    assert(workerBaseDirs.head._3 == 1)
+    assert(workerBaseDirs.head._3 == 2)
     assert(workerBaseDirs.head._2 == defaultMaxUsableSpace)
   }
 
-  test("basedir test2") {
+  test("storage test2") {
     val conf = new CelebornConf()
     val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
     conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g")
@@ -51,7 +51,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
   }
 
-  test("basedir test3") {
+  test("storage test3") {
     val conf = new CelebornConf()
     conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3")
     val workerBaseDirs = conf.workerBaseDirs
@@ -60,7 +60,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
   }
 
-  test("basedir test4") {
+  test("storage test4") {
     val conf = new CelebornConf()
     conf.set(
       "celeborn.worker.storage.dirs",
@@ -77,6 +77,45 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(workerBaseDirs(1)._2 == 15 * 1024 * 1024 * 1024L)
   }
 
+  test("storage test5") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.storage.dirs", "/mnt/disk1")
+    val workerBaseDirs = conf.workerBaseDirs
+    assert(workerBaseDirs.head._3 == 2)
+  }
+
+  test("storage test6") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.flusher.threads", "4")
+      .set("celeborn.worker.storage.dirs", "/mnt/disk1")
+    val workerBaseDirs = conf.workerBaseDirs
+    assert(workerBaseDirs.head._3 == 4)
+  }
+
+  test("storage test7") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.flusher.threads", "4")
+      .set("celeborn.worker.storage.dirs", "/mnt/disk1:flushthread=8")
+    val workerBaseDirs = conf.workerBaseDirs
+    assert(workerBaseDirs.head._3 == 8)
+  }
+
+  test("storage test8") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.flusher.threads", "4")
+      .set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD")
+    val workerBaseDirs = conf.workerBaseDirs
+    assert(workerBaseDirs.head._3 == 8)
+  }
+
+  test("storage test9") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.flusher.threads", "4")
+      .set("celeborn.worker.storage.dirs", "/mnt/disk1:flushthread=9:disktype=HDD")
+    val workerBaseDirs = conf.workerBaseDirs
+    assert(workerBaseDirs.head._3 == 9)
+  }
+
   test("zstd level") {
     val conf = new CelebornConf()
     val error1 = intercept[IllegalArgumentException] {
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8548c44b..eb3c1caf 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -50,6 +50,7 @@ license: |
 | celeborn.worker.flusher.hdfs.threads | 4 | Flusher's thread count used for write data to HDFS. | 0.2.0 | 
 | celeborn.worker.flusher.shutdownTimeout | 3s | Timeout for a flusher to shutdown. | 0.2.0 | 
 | celeborn.worker.flusher.ssd.threads | 8 | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 | 
+| celeborn.worker.flusher.threads | 2 | Flusher's thread count per disk for unkown-type disks. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.enabled | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 |