You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:29 UTC
[incubator-celeborn] 14/42: [CELEBORN-408] Add lost worker infos to http request (#1333)
This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 99d754021caae8bb78c1dbad9f11ed8fddbb26ef
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Mar 13 15:27:41 2023 +0800
[CELEBORN-408] Add lost worker infos to http request (#1333)
---
.../org/apache/celeborn/service/deploy/master/Master.scala | 11 +++++++++++
.../scala/org/apache/celeborn/server/common/HttpService.scala | 2 ++
.../celeborn/server/common/http/HttpRequestHandler.scala | 2 ++
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 2 ++
4 files changed, 17 insertions(+)
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 319b90f5c..b431d5220 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -104,6 +104,8 @@ private[celeborn] class Master(
// States
private def workersSnapShot: util.List[WorkerInfo] =
statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.workers))
+ private def lostWorkersSnapshot: ConcurrentHashMap[WorkerInfo, java.lang.Long] =
+ statusSystem.workers.synchronized(new ConcurrentHashMap(statusSystem.lostWorkers))
private def diskReserveSize = conf.diskReserveSize
@@ -717,6 +719,15 @@ private[celeborn] class Master(
sb.toString()
}
+ override def getLostWorkers: String = {
+ val sb = new StringBuilder
+ sb.append("========== Lost WorkerInfos in Master==========\n")
+ lostWorkersSnapshot.asScala.map { case (worker, time) =>
+ sb.append(s"${worker.toUniqueId()} $time\n")
+ }
+ sb.toString()
+ }
+
override def getThreadDump: String = {
val sb = new StringBuilder
val threadDump = Utils.getThreadDump()
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index fece48794..25d3abd9f 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -27,6 +27,8 @@ abstract class HttpService extends Service with Logging {
def getWorkerInfo: String
+ def getLostWorkers: String
+
def getThreadDump: String
def getHostnameList: String
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index aa6355e25..54377476a 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -64,6 +64,8 @@ class HttpRequestHandler(
uri match {
case "/workerInfo" =>
service.getWorkerInfo
+ case "/lostWorkers" if service.serviceName == Service.MASTER =>
+ service.getLostWorkers
case "/threadDump" =>
service.getThreadDump
case "/hostnames" if service.serviceName == Service.MASTER =>
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d34f4762b..5a625d6ab 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -449,6 +449,8 @@ private[celeborn] class Worker(
override def getWorkerInfo: String = workerInfo.toString()
+ override def getLostWorkers: String = throw new UnsupportedOperationException()
+
override def getThreadDump: String = Utils.getThreadDump()
override def getHostnameList: String = throw new UnsupportedOperationException()