You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/15 02:32:07 UTC

[incubator-celeborn] branch main updated: [CELEBORN-423] Format http request result (#1349)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f56a5e5d [CELEBORN-423] Format http request result (#1349)
1f56a5e5d is described below

commit 1f56a5e5d1d57c4924aacef67b92a51aa2a832a9
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Mar 15 10:32:01 2023 +0800

    [CELEBORN-423] Format http request result (#1349)
---
 .../celeborn/service/deploy/master/Master.scala    | 69 ++++++++++++----------
 .../celeborn/server/common/HttpService.scala       |  4 +-
 .../server/common/http/HttpRequestHandler.scala    |  4 +-
 .../celeborn/service/deploy/worker/Worker.scala    | 51 ++++++++++++----
 .../service/deploy/MiniClusterFeature.scala        |  2 +-
 5 files changed, 84 insertions(+), 46 deletions(-)

diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ab0377136..7bcbb419f 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -698,22 +698,26 @@ private[celeborn] class Master(
 
   override def getWorkerInfo: String = {
     val sb = new StringBuilder
+    sb.append("====================== Workers Info in Master ===========================")
     workersSnapShot.asScala.foreach { w =>
-      sb.append("==========WorkerInfos in Master==========\n")
+      sb.append(s"${w.toUniqueId().padTo(50, " ")}in Master\n")
       sb.append(w).append("\n")
 
+      sb.append("\n")
       val workerInfo = requestGetWorkerInfos(w.endpoint)
         .workerInfos.asJava
         .get(0)
 
-      sb.append("==========WorkerInfos in Workers==========\n")
+      sb.append(s"${w.toUniqueId().padTo(50, " ")}in Worker\n")
       sb.append(workerInfo).append("\n")
+      sb.append("\n")
 
       if (w.hasSameInfoWith(workerInfo)) {
-        sb.append("Consist!").append("\n")
+        sb.append(s"${w.toUniqueId().padTo(50, " ")}status consist!\n")
       } else {
-        sb.append("[ERROR] Inconsistent!").append("\n")
+        sb.append(s"${w.toUniqueId().padTo(50, " ")}status not consist!\n")
       }
+      sb.append("======================================================================")
     }
 
     sb.toString()
@@ -721,17 +725,17 @@ private[celeborn] class Master(
 
   override def getLostWorkers: String = {
     val sb = new StringBuilder
-    sb.append("========== Lost WorkerInfos in Master==========\n")
-    lostWorkersSnapshot.asScala.map { case (worker, time) =>
-      sb.append(s"${worker.toUniqueId()}      $time\n")
+    sb.append("======================= Lost Workers in Master ========================\n")
+    lostWorkersSnapshot.asScala.foreach { case (worker, time) =>
+      sb.append(s"${worker.toUniqueId().padTo(50, " ")}$time\n")
     }
     sb.toString()
   }
 
   override def getBlacklistedWorkers: String = {
     val sb = new StringBuilder
-    sb.append("==========Blacklisted WorkerInfos in Master==========\n")
-    statusSystem.blacklist.asScala.map { worker =>
+    sb.append("==================== Blacklisted Workers in Master =====================\n")
+    statusSystem.blacklist.asScala.foreach { worker =>
       sb.append(s"${worker.toUniqueId()}\n")
     }
     sb.toString()
@@ -739,47 +743,52 @@ private[celeborn] class Master(
 
   override def getThreadDump: String = {
     val sb = new StringBuilder
-    val threadDump = Utils.getThreadDump()
-    sb.append("==========Master ThreadDump==========\n")
-    sb.append(threadDump).append("\n")
-    workersSnapShot.asScala.foreach(w => {
-      sb.append(s"==========Worker ${w.readableAddress()} ThreadDump==========\n")
-      if (w.endpoint == null) {
-        w.setupEndpoint(this.rpcEnv.setupEndpointRef(
-          RpcAddress
-            .apply(w.host, w.rpcPort),
-          RpcNameConstants.WORKER_EP))
-      }
-      val res = requestThreadDump(w.endpoint)
-      sb.append(res.threadDump).append("\n")
-    })
-
+    sb.append("========================= Master ThreadDump ==========================\n")
+    sb.append(Utils.getThreadDump()).append("\n")
     sb.toString()
   }
 
   override def getHostnameList: String = {
-    statusSystem.hostnameSet.asScala.mkString("\n")
+    val sb = new StringBuilder
+    sb.append("================= LifecycleManager Hostname List ======================\n")
+    statusSystem.hostnameSet.asScala.foreach { host =>
+      sb.append(s"$host\n")
+    }
+    sb.toString()
   }
 
   override def getApplicationList: String = {
-    statusSystem.appHeartbeatTime.keys().asScala.mkString("\n")
+    val sb = new StringBuilder
+    sb.append("================= LifecycleManager Hostname List ======================\n")
+    statusSystem.appHeartbeatTime.asScala.foreach { case (appId, time) =>
+      sb.append(s"${appId.padTo(40, " ")}$time\n")
+    }
+    sb.toString()
   }
 
   override def getShuffleList: String = {
-    statusSystem.registeredShuffle.asScala.mkString("\n")
+    val sb = new StringBuilder
+    sb.append("======================= Shuffle Key List ============================\n")
+    statusSystem.registeredShuffle.asScala.foreach { shuffleKey =>
+      sb.append(s"$shuffleKey\n")
+    }
+    sb.toString()
   }
 
   override def listTopDiskUseApps: String = {
-    statusSystem.appDiskUsageMetric.summary
+    val sb = new StringBuilder
+    sb.append("================== Top Disk Usage Applications =======================\n")
+    sb.append(statusSystem.appDiskUsageMetric.summary)
+    sb.toString()
   }
 
   override def listPartitionLocationInfo: String = throw new UnsupportedOperationException()
 
   override def getUnavailablePeers: String = throw new UnsupportedOperationException()
 
-  override def isShutdown: Boolean = throw new UnsupportedOperationException()
+  override def isShutdown: String = throw new UnsupportedOperationException()
 
-  override def isRegistered: Boolean = throw new UnsupportedOperationException()
+  override def isRegistered: String = throw new UnsupportedOperationException()
 
   private def requestGetWorkerInfos(endpoint: RpcEndpointRef): GetWorkerInfosResponse = {
     try {
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index eec3da2d3..3c49c801c 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -55,9 +55,9 @@ abstract class HttpService extends Service with Logging {
 
   def getUnavailablePeers: String
 
-  def isShutdown: Boolean
+  def isShutdown: String
 
-  def isRegistered: Boolean
+  def isRegistered: String
 
   def startHttpServer(): Unit = {
     val handlers =
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index 654537836..ed2a240f8 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -85,9 +85,9 @@ class HttpRequestHandler(
       case "/unavailablePeers" if service.serviceName == Service.WORKER =>
         service.getUnavailablePeers
       case "/isShutdown" if service.serviceName == Service.WORKER =>
-        service.isShutdown.toString
+        service.isShutdown
       case "/isRegistered" if service.serviceName == Service.WORKER =>
-        service.isRegistered.toString
+        service.isRegistered
       case _ => INVALID
     }
   }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9f6589f16..ec9677bd2 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -447,43 +447,72 @@ private[celeborn] class Worker(
     fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
   }
 
-  override def getWorkerInfo: String = workerInfo.toString()
+  override def getWorkerInfo: String = {
+    val sb = new StringBuilder
+    sb.append("======================= WorkerInfo of Worker ============================")
+    sb.append(workerInfo.toString()).append("\n")
+    sb.toString()
+  }
 
   override def getLostWorkers: String = throw new UnsupportedOperationException()
 
   override def getBlacklistedWorkers: String = throw new UnsupportedOperationException()
 
-  override def getThreadDump: String = Utils.getThreadDump()
+  override def getThreadDump: String = {
+    val sb = new StringBuilder
+    sb.append("========================= Worker ThreadDump ==========================\n")
+    sb.append(Utils.getThreadDump()).append("\n")
+    sb.toString()
+  }
 
   override def getHostnameList: String = throw new UnsupportedOperationException()
 
   override def getApplicationList: String = throw new UnsupportedOperationException()
 
   override def getShuffleList: String = {
-    storageManager.shuffleKeySet().asScala.mkString("\n")
+    val sb = new StringBuilder
+    sb.append("======================= Shuffle Key List ============================\n")
+    storageManager.shuffleKeySet().asScala.foreach { shuffleKey =>
+      sb.append(s"$shuffleKey\n")
+    }
+    sb.toString()
   }
 
-  override def isShutdown: Boolean = shutdown.get()
+  override def isShutdown: String = {
+    val sb = new StringBuilder
+    sb.append("========================= Worker Shutdown ==========================\n")
+    sb.append(shutdown.get()).append("\n")
+    sb.toString()
+  }
 
-  override def isRegistered: Boolean = registered.get()
+  override def isRegistered: String = {
+    val sb = new StringBuilder
+    sb.append("========================= Worker Registered ==========================\n")
+    sb.append(registered.get()).append("\n")
+    sb.toString()
+  }
 
   override def listTopDiskUseApps: String = {
-    val stringBuilder = new StringBuilder()
+    val sb = new StringBuilder
+    sb.append("================== Top Disk Usage Applications =======================\n")
     storageManager.topAppDiskUsage.asScala.foreach { case (appId, usage) =>
-      stringBuilder.append(s"application ${appId} used ${Utils.bytesToString(usage)} ")
+      sb.append(s"Application $appId used ${Utils.bytesToString(usage)}\n")
     }
-    stringBuilder.toString()
+    sb.toString()
   }
 
   override def listPartitionLocationInfo: String = {
-    partitionLocationInfo.toString
+    val sb = new StringBuilder
+    sb.append("==================== Partition Location Info =========================\n")
+    sb.append(partitionLocationInfo.toString).append("\n")
+    sb.toString()
   }
 
   override def getUnavailablePeers: String = {
     val sb = new StringBuilder
-    sb.append("==========Unavailable Peers of Worker==========\n")
+    sb.append("==================== Unavailable Peers of Worker =====================\n")
     unavailablePeers.asScala.foreach { case (peer, time) =>
-      sb.append(s"${peer.toUniqueId().padTo(55, "")}$time\n");
+      sb.append(s"${peer.toUniqueId().padTo(50, " ")}$time\n")
     }
     sb.toString()
   }
diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 3cd369296..33ac40e59 100644
--- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -114,7 +114,7 @@ trait MiniClusterFeature extends Logging {
     Thread.sleep(5000L)
 
     workerInfos.foreach {
-      case (worker, _) => assert(worker.isRegistered)
+      case (worker, _) => assert(worker.registered.get())
     }
     (master, workerInfos.keySet)
   }