You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:27 UTC

[incubator-celeborn] 12/42: [CELEBORN-405] Add metrics about lost workers (#1330)

This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 993f1b5b7ab4e8caea8155c689ad0ff8134a8635
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Mar 13 14:49:49 2023 +0800

    [CELEBORN-405] Add metrics about lost workers (#1330)
    
    * [CELEBORN-405] Add metrics about lost workers
---
 common/src/main/proto/TransportMessages.proto               |  1 +
 .../org/apache/celeborn/common/util/PbSerDeUtils.scala      |  6 +++++-
 .../deploy/master/clustermeta/AbstractMetaManager.java      | 13 ++++++++++++-
 .../org/apache/celeborn/service/deploy/master/Master.scala  |  1 +
 .../celeborn/service/deploy/master/MasterSource.scala       |  2 ++
 5 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto
index 277138bd7..b7510c943 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -464,4 +464,5 @@ message PbSnapshotMetaInfo {
   int64 partitionTotalFileCount = 9;
   repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10;
   optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
+  map<string, int64> lostWorkers = 12;
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 56f74251d..f32e239ef 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -369,7 +369,8 @@ object PbSerDeUtils {
       partitionTotalWritten: java.lang.Long,
       partitionTotalFileCount: java.lang.Long,
       appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
-      currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot): PbSnapshotMetaInfo = {
+      currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
+      lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long]): PbSnapshotMetaInfo = {
     val builder = PbSnapshotMetaInfo.newBuilder()
       .setEstimatedPartitionSize(estimatedPartitionSize)
       .addAllRegisteredShuffle(registeredShuffle)
@@ -384,6 +385,9 @@ object PbSerDeUtils {
       // protobuf repeated value can't support null value in list.
       .addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null)
         .map(toPbAppDiskUsageSnapshot).toList.asJava)
+      .putAllLostWorkers(lostWorkers.asScala.map {
+        case (worker: WorkerInfo, time: java.lang.Long) => (worker.toUniqueId(), time)
+      }.asJava)
     if (currentAppDiskUsageMetricsSnapshot != null) {
       builder.setCurrentAppDiskUsageMetricsSnapshot(
         toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot))
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 19c3ce674..1b15d5eff 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -52,6 +52,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
   public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
   public final ArrayList<WorkerInfo> workers = new ArrayList<>();
+  public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = new ConcurrentHashMap<>();
   public final ConcurrentHashMap<String, Long> appHeartbeatTime = new ConcurrentHashMap<>();
   // blacklist
   public final Set<WorkerInfo> blacklist = ConcurrentHashMap.newKeySet();
@@ -145,6 +146,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
     // remove worker from workers
     synchronized (workers) {
       workers.remove(worker);
+      lostWorkers.put(worker, System.currentTimeMillis());
     }
     // delete from blacklist
     blacklist.remove(worker);
@@ -157,6 +159,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
     // remove worker from workers
     synchronized (workers) {
       workers.remove(worker);
+      lostWorkers.put(worker, System.currentTimeMillis());
     }
     // delete from blacklist
     blacklist.remove(worker);
@@ -240,6 +243,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
     synchronized (workers) {
       if (!workers.contains(workerInfo)) {
         workers.add(workerInfo);
+        lostWorkers.remove(workerInfo);
       }
     }
   }
@@ -263,7 +267,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
                 partitionTotalWritten.sum(),
                 partitionTotalFileCount.sum(),
                 appDiskUsageMetric.snapShots(),
-                appDiskUsageMetric.currentSnapShot().get())
+                appDiskUsageMetric.currentSnapShot().get(),
+                lostWorkers)
             .toByteArray();
     Files.write(file.toPath(), snapshotBytes);
   }
@@ -305,6 +310,12 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
               .map(PbSerDeUtils::fromPbWorkerInfo)
               .collect(Collectors.toSet()));
 
+      snapshotMetaInfo
+          .getLostWorkersMap()
+          .entrySet()
+          .forEach(
+              entry -> lostWorkers.put(WorkerInfo.fromUniqueId(entry.getKey()), entry.getValue()));
+
       partitionTotalWritten.reset();
       partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
       partitionTotalFileCount.reset();
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 3f7d9eb55..319b90f5c 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -144,6 +144,7 @@ private[celeborn] class Master(
   masterSource.addGauge(MasterSource.BlacklistedWorkerCount, _ => statusSystem.blacklist.size())
   // worker count
   masterSource.addGauge(MasterSource.WorkerCount, _ => statusSystem.workers.size())
+  masterSource.addGauge(MasterSource.LostWorkerCount, _ => statusSystem.lostWorkers.size())
   masterSource.addGauge(MasterSource.PartitionSize, _ => statusSystem.estimatedPartitionSize)
   // is master active under HA mode
   masterSource.addGauge(MasterSource.IsActiveMaster, _ => isMasterActive)
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index e7da8f8a2..9660821b2 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -37,6 +37,8 @@ object MasterSource {
 
   val WorkerCount = "WorkerCount"
 
+  val LostWorkerCount = "LostWorkers"
+
   val BlacklistedWorkerCount = "BlacklistedWorkerCount"
 
   val RegisteredShuffleCount = "RegisteredShuffleCount"