You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:29 UTC

[incubator-celeborn] 14/42: [CELEBORN-408] Add lost worker infos to http request (#1333)

This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 99d754021caae8bb78c1dbad9f11ed8fddbb26ef
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Mon Mar 13 15:27:41 2023 +0800

    [CELEBORN-408] Add lost worker infos to http request (#1333)
---
 .../org/apache/celeborn/service/deploy/master/Master.scala    | 11 +++++++++++
 .../scala/org/apache/celeborn/server/common/HttpService.scala |  2 ++
 .../celeborn/server/common/http/HttpRequestHandler.scala      |  2 ++
 .../org/apache/celeborn/service/deploy/worker/Worker.scala    |  2 ++
 4 files changed, 17 insertions(+)

diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 319b90f5c..b431d5220 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -104,6 +104,8 @@ private[celeborn] class Master(
   // States
   private def workersSnapShot: util.List[WorkerInfo] =
     statusSystem.workers.synchronized(new util.ArrayList[WorkerInfo](statusSystem.workers))
+  private def lostWorkersSnapshot: ConcurrentHashMap[WorkerInfo, java.lang.Long] =
+    statusSystem.workers.synchronized(new ConcurrentHashMap(statusSystem.lostWorkers))
 
   private def diskReserveSize = conf.diskReserveSize
 
@@ -717,6 +719,15 @@ private[celeborn] class Master(
     sb.toString()
   }
 
+  override def getLostWorkers: String = {
+    val sb = new StringBuilder
+    sb.append("========== Lost WorkerInfos in Master==========\n")
+    lostWorkersSnapshot.asScala.map { case (worker, time) =>
+      sb.append(s"${worker.toUniqueId()}      $time\n")
+    }
+    sb.toString()
+  }
+
   override def getThreadDump: String = {
     val sb = new StringBuilder
     val threadDump = Utils.getThreadDump()
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index fece48794..25d3abd9f 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -27,6 +27,8 @@ abstract class HttpService extends Service with Logging {
 
   def getWorkerInfo: String
 
+  def getLostWorkers: String
+
   def getThreadDump: String
 
   def getHostnameList: String
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index aa6355e25..54377476a 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -64,6 +64,8 @@ class HttpRequestHandler(
     uri match {
       case "/workerInfo" =>
         service.getWorkerInfo
+      case "/lostWorkers" if service.serviceName == Service.MASTER =>
+        service.getLostWorkers
       case "/threadDump" =>
         service.getThreadDump
       case "/hostnames" if service.serviceName == Service.MASTER =>
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d34f4762b..5a625d6ab 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -449,6 +449,8 @@ private[celeborn] class Worker(
 
   override def getWorkerInfo: String = workerInfo.toString()
 
+  override def getLostWorkers: String = throw new UnsupportedOperationException()
+
   override def getThreadDump: String = Utils.getThreadDump()
 
   override def getHostnameList: String = throw new UnsupportedOperationException()