You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:25 UTC
[incubator-celeborn] 10/42: [CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329)
This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 5601f3942995795c5b5c983f6561db9b90714b83
Author: Ethan Feng <et...@apache.org>
AuthorDate: Mon Mar 13 11:15:00 2023 +0800
[CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329)
---
.../common/network/server/memory/MemoryManager.java | 4 ++++
.../network/server/memory/ReadBufferDispatcher.java | 4 ++++
.../celeborn/service/deploy/worker/Worker.scala | 21 ++++++++++++---------
.../service/deploy/worker/WorkerSource.scala | 1 +
4 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
index dd0273dc4..3bb9cdaef 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
@@ -370,6 +370,10 @@ public class MemoryManager {
return pausePushDataAndReplicateCounter.sum();
}
+ public int dispatchRequestsLength() {
+ return readBufferDispatcher.requestsLength();
+ }
+
enum MemoryManagerStat {
resumeAll,
pausePushDataAndReplicate,
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
index e3a198755..37f834640 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
@@ -99,4 +99,8 @@ public class ReadBufferDispatcher extends Thread {
}
}
}
+
+ public int requestsLength() {
+ return requests.size();
+ }
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 8d375d2f0..d34f4762b 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -90,7 +90,7 @@ private[celeborn] class Worker(
val storageManager = new StorageManager(conf, workerSource)
- val memoryTracker = MemoryManager.initialize(
+ val memoryManager = MemoryManager.initialize(
conf.workerDirectMemoryRatioToPauseReceive,
conf.workerDirectMemoryRatioToPauseReplicate,
conf.workerDirectMemoryRatioToResume,
@@ -99,9 +99,9 @@ private[celeborn] class Worker(
conf.workerDirectMemoryRatioForShuffleStorage,
conf.workerDirectMemoryPressureCheckIntervalMs,
conf.workerDirectMemoryReportIntervalSecond)
- memoryTracker.registerMemoryListener(storageManager)
+ memoryManager.registerMemoryListener(storageManager)
- val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource)
+ val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource)
if (conf.workerCongestionControlEnabled) {
if (conf.workerCongestionControlLowWatermark.isEmpty || conf.workerCongestionControlHighWatermark.isEmpty) {
@@ -228,19 +228,22 @@ private[celeborn] class Worker(
WorkerSource.RegisteredShuffleCount,
_ => workerInfo.getShuffleKeySet.size())
workerSource.addGauge(WorkerSource.SlotsAllocated, _ => workerInfo.allocationsInLastHour())
- workerSource.addGauge(WorkerSource.SortMemory, _ => memoryTracker.getSortMemoryCounter.get())
+ workerSource.addGauge(WorkerSource.SortMemory, _ => memoryManager.getSortMemoryCounter.get())
workerSource.addGauge(WorkerSource.SortingFiles, _ => partitionsSorter.getSortingCount)
workerSource.addGauge(WorkerSource.SortedFiles, _ => partitionsSorter.getSortedCount)
workerSource.addGauge(WorkerSource.SortedFileSize, _ => partitionsSorter.getSortedSize)
- workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryTracker.getDiskBufferCounter.get())
- workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryTracker.getNettyMemoryCounter.get())
- workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryTracker.getPausePushDataCounter)
+ workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryManager.getDiskBufferCounter.get())
+ workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryManager.getNettyMemoryCounter.get())
+ workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryManager.getPausePushDataCounter)
workerSource.addGauge(
WorkerSource.PausePushDataAndReplicateCount,
- _ => memoryTracker.getPausePushDataAndReplicateCounter)
+ _ => memoryManager.getPausePushDataAndReplicateCounter)
workerSource.addGauge(
WorkerSource.BufferStreamReadBuffer,
- _ => memoryTracker.getReadBufferCounter.get())
+ _ => memoryManager.getReadBufferCounter.get())
+ workerSource.addGauge(
+ WorkerSource.readBufferDispatcherRequestsLength,
+ _ => memoryManager.dispatchRequestsLength)
private def heartBeatToMaster(): Unit = {
val activeShuffleKeys = new JHashSet[String]()
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index d3c83f780..ff8e4d570 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -99,6 +99,7 @@ object WorkerSource {
val PausePushDataCount = "PausePushData"
val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
val BufferStreamReadBuffer = "BufferStreamReadBuffer"
+ val readBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength"
// local device
val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"