You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:27 UTC
[incubator-celeborn] 12/42: [CELEBORN-405] Add metrics about lost workers (#1330)
This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 993f1b5b7ab4e8caea8155c689ad0ff8134a8635
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Mar 13 14:49:49 2023 +0800
[CELEBORN-405] Add metrics about lost workers (#1330)
* [CELEBORN-405] Add metrics about lost workers
---
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/util/PbSerDeUtils.scala | 6 +++++-
.../deploy/master/clustermeta/AbstractMetaManager.java | 13 ++++++++++++-
.../org/apache/celeborn/service/deploy/master/Master.scala | 1 +
.../celeborn/service/deploy/master/MasterSource.scala | 2 ++
5 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto
index 277138bd7..b7510c943 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -464,4 +464,5 @@ message PbSnapshotMetaInfo {
int64 partitionTotalFileCount = 9;
repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10;
optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
+ map<string, int64> lostWorkers = 12;
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 56f74251d..f32e239ef 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -369,7 +369,8 @@ object PbSerDeUtils {
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
- currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot): PbSnapshotMetaInfo = {
+ currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
+ lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
.addAllRegisteredShuffle(registeredShuffle)
@@ -384,6 +385,9 @@ object PbSerDeUtils {
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null)
.map(toPbAppDiskUsageSnapshot).toList.asJava)
+ .putAllLostWorkers(lostWorkers.asScala.map {
+ case (worker: WorkerInfo, time: java.lang.Long) => (worker.toUniqueId(), time)
+ }.asJava)
if (currentAppDiskUsageMetricsSnapshot != null) {
builder.setCurrentAppDiskUsageMetricsSnapshot(
toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot))
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 19c3ce674..1b15d5eff 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -52,6 +52,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final ArrayList<WorkerInfo> workers = new ArrayList<>();
+ public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String, Long> appHeartbeatTime = new ConcurrentHashMap<>();
// blacklist
public final Set<WorkerInfo> blacklist = ConcurrentHashMap.newKeySet();
@@ -145,6 +146,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
// remove worker from workers
synchronized (workers) {
workers.remove(worker);
+ lostWorkers.put(worker, System.currentTimeMillis());
}
// delete from blacklist
blacklist.remove(worker);
@@ -157,6 +159,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
// remove worker from workers
synchronized (workers) {
workers.remove(worker);
+ lostWorkers.put(worker, System.currentTimeMillis());
}
// delete from blacklist
blacklist.remove(worker);
@@ -240,6 +243,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
synchronized (workers) {
if (!workers.contains(workerInfo)) {
workers.add(workerInfo);
+ lostWorkers.remove(workerInfo);
}
}
}
@@ -263,7 +267,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
appDiskUsageMetric.snapShots(),
- appDiskUsageMetric.currentSnapShot().get())
+ appDiskUsageMetric.currentSnapShot().get(),
+ lostWorkers)
.toByteArray();
Files.write(file.toPath(), snapshotBytes);
}
@@ -305,6 +310,12 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
+ snapshotMetaInfo
+ .getLostWorkersMap()
+ .entrySet()
+ .forEach(
+ entry -> lostWorkers.put(WorkerInfo.fromUniqueId(entry.getKey()), entry.getValue()));
+
partitionTotalWritten.reset();
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.reset();
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 3f7d9eb55..319b90f5c 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -144,6 +144,7 @@ private[celeborn] class Master(
masterSource.addGauge(MasterSource.BlacklistedWorkerCount, _ => statusSystem.blacklist.size())
// worker count
masterSource.addGauge(MasterSource.WorkerCount, _ => statusSystem.workers.size())
+ masterSource.addGauge(MasterSource.LostWorkerCount, _ => statusSystem.lostWorkers.size())
masterSource.addGauge(MasterSource.PartitionSize, _ => statusSystem.estimatedPartitionSize)
// is master active under HA mode
masterSource.addGauge(MasterSource.IsActiveMaster, _ => isMasterActive)
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index e7da8f8a2..9660821b2 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -37,6 +37,8 @@ object MasterSource {
val WorkerCount = "WorkerCount"
+ val LostWorkerCount = "LostWorkers"
+
val BlacklistedWorkerCount = "BlacklistedWorkerCount"
val RegisteredShuffleCount = "RegisteredShuffleCount"