You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:25 UTC

[incubator-celeborn] 10/42: [CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329)

This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 5601f3942995795c5b5c983f6561db9b90714b83
Author: Ethan Feng <et...@apache.org>
AuthorDate: Mon Mar 13 11:15:00 2023 +0800

    [CELEBORN-403][FLINK] Add metrics about buffer dispatcher request queue length. (#1329)
---
 .../common/network/server/memory/MemoryManager.java |  4 ++++
 .../network/server/memory/ReadBufferDispatcher.java |  4 ++++
 .../celeborn/service/deploy/worker/Worker.scala     | 21 ++++++++++++---------
 .../service/deploy/worker/WorkerSource.scala        |  1 +
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
index dd0273dc4..3bb9cdaef 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/MemoryManager.java
@@ -370,6 +370,10 @@ public class MemoryManager {
     return pausePushDataAndReplicateCounter.sum();
   }
 
+  public int dispatchRequestsLength() {
+    return readBufferDispatcher.requestsLength();
+  }
+
   enum MemoryManagerStat {
     resumeAll,
     pausePushDataAndReplicate,
diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
index e3a198755..37f834640 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/server/memory/ReadBufferDispatcher.java
@@ -99,4 +99,8 @@ public class ReadBufferDispatcher extends Thread {
       }
     }
   }
+
+  public int requestsLength() {
+    return requests.size();
+  }
 }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 8d375d2f0..d34f4762b 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -90,7 +90,7 @@ private[celeborn] class Worker(
 
   val storageManager = new StorageManager(conf, workerSource)
 
-  val memoryTracker = MemoryManager.initialize(
+  val memoryManager = MemoryManager.initialize(
     conf.workerDirectMemoryRatioToPauseReceive,
     conf.workerDirectMemoryRatioToPauseReplicate,
     conf.workerDirectMemoryRatioToResume,
@@ -99,9 +99,9 @@ private[celeborn] class Worker(
     conf.workerDirectMemoryRatioForShuffleStorage,
     conf.workerDirectMemoryPressureCheckIntervalMs,
     conf.workerDirectMemoryReportIntervalSecond)
-  memoryTracker.registerMemoryListener(storageManager)
+  memoryManager.registerMemoryListener(storageManager)
 
-  val partitionsSorter = new PartitionFilesSorter(memoryTracker, conf, workerSource)
+  val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource)
 
   if (conf.workerCongestionControlEnabled) {
     if (conf.workerCongestionControlLowWatermark.isEmpty || conf.workerCongestionControlHighWatermark.isEmpty) {
@@ -228,19 +228,22 @@ private[celeborn] class Worker(
     WorkerSource.RegisteredShuffleCount,
     _ => workerInfo.getShuffleKeySet.size())
   workerSource.addGauge(WorkerSource.SlotsAllocated, _ => workerInfo.allocationsInLastHour())
-  workerSource.addGauge(WorkerSource.SortMemory, _ => memoryTracker.getSortMemoryCounter.get())
+  workerSource.addGauge(WorkerSource.SortMemory, _ => memoryManager.getSortMemoryCounter.get())
   workerSource.addGauge(WorkerSource.SortingFiles, _ => partitionsSorter.getSortingCount)
   workerSource.addGauge(WorkerSource.SortedFiles, _ => partitionsSorter.getSortedCount)
   workerSource.addGauge(WorkerSource.SortedFileSize, _ => partitionsSorter.getSortedSize)
-  workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryTracker.getDiskBufferCounter.get())
-  workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryTracker.getNettyMemoryCounter.get())
-  workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryTracker.getPausePushDataCounter)
+  workerSource.addGauge(WorkerSource.DiskBuffer, _ => memoryManager.getDiskBufferCounter.get())
+  workerSource.addGauge(WorkerSource.NettyMemory, _ => memoryManager.getNettyMemoryCounter.get())
+  workerSource.addGauge(WorkerSource.PausePushDataCount, _ => memoryManager.getPausePushDataCounter)
   workerSource.addGauge(
     WorkerSource.PausePushDataAndReplicateCount,
-    _ => memoryTracker.getPausePushDataAndReplicateCounter)
+    _ => memoryManager.getPausePushDataAndReplicateCounter)
   workerSource.addGauge(
     WorkerSource.BufferStreamReadBuffer,
-    _ => memoryTracker.getReadBufferCounter.get())
+    _ => memoryManager.getReadBufferCounter.get())
+  workerSource.addGauge(
+    WorkerSource.readBufferDispatcherRequestsLength,
+    _ => memoryManager.dispatchRequestsLength)
 
   private def heartBeatToMaster(): Unit = {
     val activeShuffleKeys = new JHashSet[String]()
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index d3c83f780..ff8e4d570 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -99,6 +99,7 @@ object WorkerSource {
   val PausePushDataCount = "PausePushData"
   val PausePushDataAndReplicateCount = "PausePushDataAndReplicate"
   val BufferStreamReadBuffer = "BufferStreamReadBuffer"
+  val readBufferDispatcherRequestsLength = "ReadBufferDispatcherRequestsLength"
 
   // local device
   val DeviceOSFreeCapacity = "DeviceOSFreeCapacity(B)"