You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/15 02:32:07 UTC
[incubator-celeborn] branch main updated: [CELEBORN-423] Format http request result (#1349)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1f56a5e5d [CELEBORN-423] Format http request result (#1349)
1f56a5e5d is described below
commit 1f56a5e5d1d57c4924aacef67b92a51aa2a832a9
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Mar 15 10:32:01 2023 +0800
[CELEBORN-423] Format http request result (#1349)
---
.../celeborn/service/deploy/master/Master.scala | 69 ++++++++++++----------
.../celeborn/server/common/HttpService.scala | 4 +-
.../server/common/http/HttpRequestHandler.scala | 4 +-
.../celeborn/service/deploy/worker/Worker.scala | 51 ++++++++++++----
.../service/deploy/MiniClusterFeature.scala | 2 +-
5 files changed, 84 insertions(+), 46 deletions(-)
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ab0377136..7bcbb419f 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -698,22 +698,26 @@ private[celeborn] class Master(
override def getWorkerInfo: String = {
val sb = new StringBuilder
+ sb.append("====================== Workers Info in Master ===========================")
workersSnapShot.asScala.foreach { w =>
- sb.append("==========WorkerInfos in Master==========\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ")}in Master\n")
sb.append(w).append("\n")
+ sb.append("\n")
val workerInfo = requestGetWorkerInfos(w.endpoint)
.workerInfos.asJava
.get(0)
- sb.append("==========WorkerInfos in Workers==========\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ")}in Worker\n")
sb.append(workerInfo).append("\n")
+ sb.append("\n")
if (w.hasSameInfoWith(workerInfo)) {
- sb.append("Consist!").append("\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ")}status consist!\n")
} else {
- sb.append("[ERROR] Inconsistent!").append("\n")
+ sb.append(s"${w.toUniqueId().padTo(50, " ")}status not consist!\n")
}
+ sb.append("======================================================================")
}
sb.toString()
@@ -721,17 +725,17 @@ private[celeborn] class Master(
override def getLostWorkers: String = {
val sb = new StringBuilder
- sb.append("========== Lost WorkerInfos in Master==========\n")
- lostWorkersSnapshot.asScala.map { case (worker, time) =>
- sb.append(s"${worker.toUniqueId()} $time\n")
+ sb.append("======================= Lost Workers in Master ========================\n")
+ lostWorkersSnapshot.asScala.foreach { case (worker, time) =>
+ sb.append(s"${worker.toUniqueId().padTo(50, " ")}$time\n")
}
sb.toString()
}
override def getBlacklistedWorkers: String = {
val sb = new StringBuilder
- sb.append("==========Blacklisted WorkerInfos in Master==========\n")
- statusSystem.blacklist.asScala.map { worker =>
+ sb.append("==================== Blacklisted Workers in Master =====================\n")
+ statusSystem.blacklist.asScala.foreach { worker =>
sb.append(s"${worker.toUniqueId()}\n")
}
sb.toString()
@@ -739,47 +743,52 @@ private[celeborn] class Master(
override def getThreadDump: String = {
val sb = new StringBuilder
- val threadDump = Utils.getThreadDump()
- sb.append("==========Master ThreadDump==========\n")
- sb.append(threadDump).append("\n")
- workersSnapShot.asScala.foreach(w => {
- sb.append(s"==========Worker ${w.readableAddress()} ThreadDump==========\n")
- if (w.endpoint == null) {
- w.setupEndpoint(this.rpcEnv.setupEndpointRef(
- RpcAddress
- .apply(w.host, w.rpcPort),
- RpcNameConstants.WORKER_EP))
- }
- val res = requestThreadDump(w.endpoint)
- sb.append(res.threadDump).append("\n")
- })
-
+ sb.append("========================= Master ThreadDump ==========================\n")
+ sb.append(Utils.getThreadDump()).append("\n")
sb.toString()
}
override def getHostnameList: String = {
- statusSystem.hostnameSet.asScala.mkString("\n")
+ val sb = new StringBuilder
+ sb.append("================= LifecycleManager Hostname List ======================\n")
+ statusSystem.hostnameSet.asScala.foreach { host =>
+ sb.append(s"$host\n")
+ }
+ sb.toString()
}
override def getApplicationList: String = {
- statusSystem.appHeartbeatTime.keys().asScala.mkString("\n")
+ val sb = new StringBuilder
+ sb.append("================= LifecycleManager Hostname List ======================\n")
+ statusSystem.appHeartbeatTime.asScala.foreach { case (appId, time) =>
+ sb.append(s"${appId.padTo(40, " ")}$time\n")
+ }
+ sb.toString()
}
override def getShuffleList: String = {
- statusSystem.registeredShuffle.asScala.mkString("\n")
+ val sb = new StringBuilder
+ sb.append("======================= Shuffle Key List ============================\n")
+ statusSystem.registeredShuffle.asScala.foreach { shuffleKey =>
+ sb.append(s"$shuffleKey\n")
+ }
+ sb.toString()
}
override def listTopDiskUseApps: String = {
- statusSystem.appDiskUsageMetric.summary
+ val sb = new StringBuilder
+ sb.append("================== Top Disk Usage Applications =======================\n")
+ sb.append(statusSystem.appDiskUsageMetric.summary)
+ sb.toString()
}
override def listPartitionLocationInfo: String = throw new UnsupportedOperationException()
override def getUnavailablePeers: String = throw new UnsupportedOperationException()
- override def isShutdown: Boolean = throw new UnsupportedOperationException()
+ override def isShutdown: String = throw new UnsupportedOperationException()
- override def isRegistered: Boolean = throw new UnsupportedOperationException()
+ override def isRegistered: String = throw new UnsupportedOperationException()
private def requestGetWorkerInfos(endpoint: RpcEndpointRef): GetWorkerInfosResponse = {
try {
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index eec3da2d3..3c49c801c 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -55,9 +55,9 @@ abstract class HttpService extends Service with Logging {
def getUnavailablePeers: String
- def isShutdown: Boolean
+ def isShutdown: String
- def isRegistered: Boolean
+ def isRegistered: String
def startHttpServer(): Unit = {
val handlers =
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index 654537836..ed2a240f8 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -85,9 +85,9 @@ class HttpRequestHandler(
case "/unavailablePeers" if service.serviceName == Service.WORKER =>
service.getUnavailablePeers
case "/isShutdown" if service.serviceName == Service.WORKER =>
- service.isShutdown.toString
+ service.isShutdown
case "/isRegistered" if service.serviceName == Service.WORKER =>
- service.isRegistered.toString
+ service.isRegistered
case _ => INVALID
}
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9f6589f16..ec9677bd2 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -447,43 +447,72 @@ private[celeborn] class Worker(
fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
}
- override def getWorkerInfo: String = workerInfo.toString()
+ override def getWorkerInfo: String = {
+ val sb = new StringBuilder
+ sb.append("======================= WorkerInfo of Worker ============================")
+ sb.append(workerInfo.toString()).append("\n")
+ sb.toString()
+ }
override def getLostWorkers: String = throw new UnsupportedOperationException()
override def getBlacklistedWorkers: String = throw new UnsupportedOperationException()
- override def getThreadDump: String = Utils.getThreadDump()
+ override def getThreadDump: String = {
+ val sb = new StringBuilder
+ sb.append("========================= Worker ThreadDump ==========================\n")
+ sb.append(Utils.getThreadDump()).append("\n")
+ sb.toString()
+ }
override def getHostnameList: String = throw new UnsupportedOperationException()
override def getApplicationList: String = throw new UnsupportedOperationException()
override def getShuffleList: String = {
- storageManager.shuffleKeySet().asScala.mkString("\n")
+ val sb = new StringBuilder
+ sb.append("======================= Shuffle Key List ============================\n")
+ storageManager.shuffleKeySet().asScala.foreach { shuffleKey =>
+ sb.append(s"$shuffleKey\n")
+ }
+ sb.toString()
}
- override def isShutdown: Boolean = shutdown.get()
+ override def isShutdown: String = {
+ val sb = new StringBuilder
+ sb.append("========================= Worker Shutdown ==========================\n")
+ sb.append(shutdown.get()).append("\n")
+ sb.toString()
+ }
- override def isRegistered: Boolean = registered.get()
+ override def isRegistered: String = {
+ val sb = new StringBuilder
+ sb.append("========================= Worker Registered ==========================\n")
+ sb.append(registered.get()).append("\n")
+ sb.toString()
+ }
override def listTopDiskUseApps: String = {
- val stringBuilder = new StringBuilder()
+ val sb = new StringBuilder
+ sb.append("================== Top Disk Usage Applications =======================\n")
storageManager.topAppDiskUsage.asScala.foreach { case (appId, usage) =>
- stringBuilder.append(s"application ${appId} used ${Utils.bytesToString(usage)} ")
+ sb.append(s"Application $appId used ${Utils.bytesToString(usage)}\n")
}
- stringBuilder.toString()
+ sb.toString()
}
override def listPartitionLocationInfo: String = {
- partitionLocationInfo.toString
+ val sb = new StringBuilder
+ sb.append("==================== Partition Location Info =========================\n")
+ sb.append(partitionLocationInfo.toString).append("\n")
+ sb.toString()
}
override def getUnavailablePeers: String = {
val sb = new StringBuilder
- sb.append("==========Unavailable Peers of Worker==========\n")
+ sb.append("==================== Unavailable Peers of Worker =====================\n")
unavailablePeers.asScala.foreach { case (peer, time) =>
- sb.append(s"${peer.toUniqueId().padTo(55, "")}$time\n");
+ sb.append(s"${peer.toUniqueId().padTo(50, " ")}$time\n")
}
sb.toString()
}
diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 3cd369296..33ac40e59 100644
--- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -114,7 +114,7 @@ trait MiniClusterFeature extends Logging {
Thread.sleep(5000L)
workerInfos.foreach {
- case (worker, _) => assert(worker.isRegistered)
+ case (worker, _) => assert(worker.registered.get())
}
(master, workerInfos.keySet)
}