You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/01/14 05:21:38 UTC
[incubator-celeborn] branch branch-0.2 updated: [CELEBORN-225] Add global default configuration for number of flusher… (#1165)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new 1bb782b8 [CELEBORN-225] Add global default configuration for number of flusher… (#1165)
1bb782b8 is described below
commit 1bb782b89623931a4bdf8cd8c336f3d1d87bfd77
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Sat Jan 14 13:20:44 2023 +0800
[CELEBORN-225] Add global default configuration for number of flusher… (#1165)
---
.../org/apache/celeborn/common/CelebornConf.scala | 26 ++++++++----
.../apache/celeborn/common/CelebornConfSuite.scala | 49 +++++++++++++++++++---
docs/configuration/worker.md | 1 +
3 files changed, 64 insertions(+), 12 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c0e194a5..ece2fe82 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -755,11 +755,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
storageDirs.map { str =>
var maxCapacity = defaultMaxCapacity
var diskType = HDD
- var flushThread = -1
+ var flushThread = get(WORKER_FLUSHER_THREADS)
val (dir, attributes) = str.split(":").toList match {
case _dir :: tail => (_dir, tail)
case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
}
+ var flushThreadsDefined = false
attributes.foreach {
case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
@@ -768,17 +769,19 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
if (diskType == Type.MEMORY) {
throw new IOException(s"Invalid diskType: $diskType")
}
+ if (!flushThreadsDefined) {
+ flushThread = diskType match {
+ case HDD => hddFlusherThreads
+ case SSD => ssdFlusherThreads
+ case _ => flushThread
+ }
+ }
case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
flushThread = threadCountStr.split("=")(1).toInt
+ flushThreadsDefined = true
case illegal =>
throw new IllegalArgumentException(s"Illegal attribute: $illegal")
}
- if (flushThread == -1) {
- flushThread = diskType match {
- case HDD => hddFlusherThreads
- case SSD => ssdFlusherThreads
- }
- }
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
@@ -1890,6 +1893,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
+ val WORKER_FLUSHER_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.flusher.threads")
+ .withAlternative("rss.flusher.thread.count")
+ .categories("worker")
+ .doc("Flusher's thread count per disk for unkown-type disks.")
+ .version("0.2.0")
+ .intConf
+ .createWithDefault(2)
+
val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdd.threads")
.withAlternative("rss.flusher.hdd.thread.count")
diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 0b030d49..1be8f13d 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -31,17 +31,17 @@ class CelebornConfSuite extends RssFunSuite {
assert(masterEndpoints(1) == "localhost2:9097")
}
- test("basedir test") {
+ test("storage test") {
val conf = new CelebornConf()
val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1")
val workerBaseDirs = conf.workerBaseDirs
assert(workerBaseDirs.size == 1)
- assert(workerBaseDirs.head._3 == 1)
+ assert(workerBaseDirs.head._3 == 2)
assert(workerBaseDirs.head._2 == defaultMaxUsableSpace)
}
- test("basedir test2") {
+ test("storage test2") {
val conf = new CelebornConf()
val defaultMaxUsableSpace = 1024L * 1024 * 1024 * 1024 * 1024
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g")
@@ -51,7 +51,7 @@ class CelebornConfSuite extends RssFunSuite {
assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
}
- test("basedir test3") {
+ test("storage test3") {
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD:capacity=10g:flushthread=3")
val workerBaseDirs = conf.workerBaseDirs
@@ -60,7 +60,7 @@ class CelebornConfSuite extends RssFunSuite {
assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
}
- test("basedir test4") {
+ test("storage test4") {
val conf = new CelebornConf()
conf.set(
"celeborn.worker.storage.dirs",
@@ -77,6 +77,45 @@ class CelebornConfSuite extends RssFunSuite {
assert(workerBaseDirs(1)._2 == 15 * 1024 * 1024 * 1024L)
}
+ test("storage test5") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.storage.dirs", "/mnt/disk1")
+ val workerBaseDirs = conf.workerBaseDirs
+ assert(workerBaseDirs.head._3 == 2)
+ }
+
+ test("storage test6") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.flusher.threads", "4")
+ .set("celeborn.worker.storage.dirs", "/mnt/disk1")
+ val workerBaseDirs = conf.workerBaseDirs
+ assert(workerBaseDirs.head._3 == 4)
+ }
+
+ test("storage test7") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.flusher.threads", "4")
+ .set("celeborn.worker.storage.dirs", "/mnt/disk1:flushthread=8")
+ val workerBaseDirs = conf.workerBaseDirs
+ assert(workerBaseDirs.head._3 == 8)
+ }
+
+ test("storage test8") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.flusher.threads", "4")
+ .set("celeborn.worker.storage.dirs", "/mnt/disk1:disktype=SSD")
+ val workerBaseDirs = conf.workerBaseDirs
+ assert(workerBaseDirs.head._3 == 8)
+ }
+
+ test("storage test9") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.flusher.threads", "4")
+ .set("celeborn.worker.storage.dirs", "/mnt/disk1:flushthread=9:disktype=HDD")
+ val workerBaseDirs = conf.workerBaseDirs
+ assert(workerBaseDirs.head._3 == 9)
+ }
+
test("zstd level") {
val conf = new CelebornConf()
val error1 = intercept[IllegalArgumentException] {
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 89b9eb79..4e0ed09d 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -50,6 +50,7 @@ license: |
| celeborn.worker.flusher.hdfs.threads | 4 | Flusher's thread count used for write data to HDFS. | 0.2.0 |
| celeborn.worker.flusher.shutdownTimeout | 3s | Timeout for a flusher to shutdown. | 0.2.0 |
| celeborn.worker.flusher.ssd.threads | 8 | Flusher's thread count per disk used for write data to SSD disks. | 0.2.0 |
+| celeborn.worker.flusher.threads | 2 | Flusher's thread count per disk for unkown-type disks. | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | The wait interval of checking whether all released slots to be committed or destroyed during worker graceful shutdown | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.enabled | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 |