You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/16 08:38:00 UTC
[incubator-celeborn] branch main updated: [CELEBORN-829] Improve response message of invalid HTTP request
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f2d6cc752 [CELEBORN-829] Improve response message of invalid HTTP request
f2d6cc752 is described below
commit f2d6cc7525d309b6dd741e661ac0abce7c671a29
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Oct 16 16:37:51 2023 +0800
[CELEBORN-829] Improve response message of invalid HTTP request
### What changes were proposed in this pull request?
Improve response message of invalid HTTP request, which lists available API providers like as below:
- master
```
Invalid uri of the master. Available API providers include:
/applications List all running application's ids of the cluster.
/conf List the conf setting of the master.
/excludedWorkers List all excluded workers of the master.
/help List the available API providers of the master.
/hostnames List all running application's LifecycleManager's hostnames of the cluster.
/listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
/lostWorkers List all lost workers of the master.
/masterGroupInfo List master group information of the service. It will list all master's LEADER, FOLLOWER information.
/shuffles List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
/shutdownWorkers List all shutdown workers of the master.
/threadDump List the current thread dump of the master.
/workerInfo List worker information of the service. It will list all registered workers 's information.
```
- worker
```
Invalid uri of the worker. Available API providers include:
/conf List the conf setting of the worker.
/exit Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
/help List the available API providers of the worker.
/isRegistered Show if the worker is registered to the master success.
/isShutdown Show if the worker is during the process of shutdown.
/listPartitionLocationInfo List all the living PartitionLocation information in that worker.
/listTopDiskUsedApps List the top disk usage application ids. It only return application ids running in that worker.
/shuffles List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
/threadDump List the current thread dump of the worker.
/unavailablePeers List the unavailable peers of the worker, this always means the worker connect to the peer failed.
/workerInfo List the worker information of the worker.
```
### Why are the changes needed?
Response message of invalid HTTP request could not help users with correct HTTP path.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`HttpUtilsSuite#CELEBORN-829: Improve response message of invalid HTTP request`
Closes #1986 from SteNicholas/CELEBORN-829.
Authored-by: SteNicholas <pr...@163.com>
Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
docs/monitoring.md | 2 +
.../celeborn/server/common/http/HttpEndpoint.scala | 219 +++++++++++++++++++++
.../server/common/http/HttpRequestHandler.scala | 98 +--------
.../celeborn/server/common/http/HttpUtils.scala | 50 ++++-
.../server/common/http/HttpUtilsSuite.scala | 35 +++-
5 files changed, 309 insertions(+), 95 deletions(-)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index cc2dc819f..54dfcf1ec 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -301,6 +301,7 @@ API path listed as below:
| /applications | List all running application's ids of the cluster. |
| /shuffles | List all running shuffle keys of the service. It will return all running shuffle's key of the cluster. |
| /listTopDiskUsedApps | List the top disk usage application ids. It will return the top disk usage application ids for the cluster. |
+| /help | List the available API providers of the master. |
#### Worker
@@ -317,3 +318,4 @@ API path listed as below:
| /isShutdown | Show if the worker is during the process of shutdown. |
| /isRegistered | Show if the worker is registered to the master success. |
| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY' |
+| /help | List the available API providers of the worker. |
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
new file mode 100644
index 000000000..0cbc3b975
--- /dev/null
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import org.apache.celeborn.server.common.{HttpService, Service}
+
+/**
+ * HTTP endpoints of Rest API providers.
+ */
+trait HttpEndpoint {
+ def path: String
+
+ def description(service: String): String
+
+ def handle(service: HttpService, parameters: Map[String, String]): String
+}
+
+case object Conf extends HttpEndpoint {
+ override def path: String = "/conf"
+
+ override def description(service: String): String = s"List the conf setting of the $service."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getConf
+}
+
+case object WorkerInfo extends HttpEndpoint {
+ override def path: String = "/workerInfo"
+
+ override def description(service: String): String = {
+ if (service == Service.MASTER)
+ "List worker information of the service. It will list all registered workers 's information."
+ else "List the worker information of the worker."
+ }
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getWorkerInfo
+}
+
+case object ThreadDump extends HttpEndpoint {
+ override def path: String = "/threadDump"
+
+ override def description(service: String): String =
+ s"List the current thread dump of the $service."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getThreadDump
+}
+
+case object Shuffles extends HttpEndpoint {
+ override def path: String = "/shuffles"
+
+ override def description(service: String): String = {
+ if (service == Service.MASTER)
+ "List all running shuffle keys of the service. It will return all running shuffle's key of the cluster."
+ else
+ "List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker."
+ }
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getShuffleList
+}
+
+case object ListTopDiskUsedApps extends HttpEndpoint {
+ override def path: String = "/listTopDiskUsedApps"
+
+ override def description(service: String): String = {
+ if (service == Service.MASTER)
+ "List the top disk usage application ids. It will return the top disk usage application ids for the cluster."
+ else
+ "List the top disk usage application ids. It only return application ids running in that worker."
+ }
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.listTopDiskUseApps
+}
+
+case object Help extends HttpEndpoint {
+ override def path: String = "/help"
+
+ override def description(service: String): String =
+ s"List the available API providers of the $service."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ HttpUtils.help(service.serviceName)
+}
+
+case object Invalid extends HttpEndpoint {
+
+ val invalid = "invalid"
+
+ override def path: String = None.toString
+
+ override def description(service: String): String = s"Invalid uri of the $service."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String = invalid
+}
+
+case object MasterGroupInfo extends HttpEndpoint {
+ override def path: String = "/masterGroupInfo"
+
+ override def description(service: String): String =
+ "List master group information of the service. It will list all master's LEADER, FOLLOWER information."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getMasterGroupInfo
+}
+
+case object LostWorkers extends HttpEndpoint {
+ override def path: String = "/lostWorkers"
+
+ override def description(service: String): String = "List all lost workers of the master."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getLostWorkers
+}
+
+case object ExcludedWorkers extends HttpEndpoint {
+ override def path: String = "/excludedWorkers"
+
+ override def description(service: String): String = "List all excluded workers of the master."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getExcludedWorkers
+}
+
+case object ShutdownWorkers extends HttpEndpoint {
+ override def path: String = "/shutdownWorkers"
+
+ override def description(service: String): String = "List all shutdown workers of the master."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getShutdownWorkers
+}
+
+case object Hostnames extends HttpEndpoint {
+ override def path: String = "/hostnames"
+
+ override def description(service: String): String =
+ "List all running application's LifecycleManager's hostnames of the cluster."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getHostnameList
+}
+
+case object Applications extends HttpEndpoint {
+ override def path: String = "/applications"
+
+ override def description(service: String): String =
+ "List all running application's ids of the cluster."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getApplicationList
+}
+
+case object ListPartitionLocationInfo extends HttpEndpoint {
+ override def path: String = "/listPartitionLocationInfo"
+
+ override def description(service: String): String =
+ "List all the living PartitionLocation information in that worker."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.listPartitionLocationInfo
+}
+
+case object UnavailablePeers extends HttpEndpoint {
+ override def path: String = "/unavailablePeers"
+
+ override def description(service: String): String =
+ "List the unavailable peers of the worker, this always means the worker connect to the peer failed."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.getUnavailablePeers
+}
+
+case object IsShutdown extends HttpEndpoint {
+ override def path: String = "/isShutdown"
+
+ override def description(service: String): String =
+ "Show if the worker is during the process of shutdown."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.isShutdown
+}
+
+case object IsRegistered extends HttpEndpoint {
+ override def path: String = "/isRegistered"
+
+ override def description(service: String): String =
+ "Show if the worker is registered to the master success."
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.isRegistered
+}
+
+case object Exit extends HttpEndpoint {
+ override def path: String = "/exit"
+
+ override def description(service: String): String =
+ "Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'"
+
+ override def handle(service: HttpService, parameters: Map[String, String]): String =
+ service.exit(parameters.getOrElse("TYPE", ""))
+}
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index d574947ed..86409c6bd 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -25,7 +25,7 @@ import io.netty.util.CharsetUtil
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.sink.PrometheusHttpRequestHandler
-import org.apache.celeborn.server.common.{HttpService, Service}
+import org.apache.celeborn.server.common.HttpService
/**
* A handler for the REST API that defines how to handle the HTTP request given a message.
@@ -45,13 +45,14 @@ class HttpRequestHandler(
override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
val uri = req.uri()
- val msg = handleRequest(uri)
+ val (path, parameters) = HttpUtils.parseUri(uri)
+ val msg = HttpUtils.handleRequest(service, path, parameters)
val response = msg match {
- case "invalid" =>
+ case Invalid.invalid =>
if (prometheusHttpRequestHandler != null) {
prometheusHttpRequestHandler.handleRequest(uri)
} else {
- s"invalid uri $uri"
+ s"${Invalid.description(service.serviceName)} ${HttpUtils.help(service.serviceName)}"
}
case _ => msg
}
@@ -63,93 +64,4 @@ class HttpRequestHandler(
res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8")
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE)
}
-
- private def handleRequest(uri: String): String = {
- if (service.serviceName == Service.MASTER) {
- new MasterRequestHandler(service, uri).handle()
- } else {
- new WorkerRequestHandler(service, uri).handle()
- }
- }
-}
-
-/**
- * A basic handler for the REST API that defines how to handle the HTTP request.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request.
- */
-class BaseRequestHandler(service: HttpService, uri: String) extends Logging {
-
- val (path, parameters) = HttpUtils.parseUrl(uri)
-
- def handle(): String = {
- path match {
- case "/conf" =>
- service.getConf
- case "/workerInfo" =>
- service.getWorkerInfo
- case "/threadDump" =>
- service.getThreadDump
- case "/shuffles" =>
- service.getShuffleList
- case "/listTopDiskUsedApps" =>
- service.listTopDiskUseApps
- case _ => "invalid"
- }
- }
-}
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request from master.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request from master.
- */
-class MasterRequestHandler(service: HttpService, uri: String)
- extends BaseRequestHandler(service, uri) with Logging {
-
- override def handle(): String = {
- path match {
- case "/masterGroupInfo" =>
- service.getMasterGroupInfo
- case "/lostWorkers" =>
- service.getLostWorkers
- case "/excludedWorkers" =>
- service.getExcludedWorkers
- case "/shutdownWorkers" =>
- service.getShutdownWorkers
- case "/hostnames" =>
- service.getHostnameList
- case "/applications" =>
- service.getApplicationList
- case _ => super.handle()
- }
- }
-}
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request from worker.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request from worker.
- */
-class WorkerRequestHandler(service: HttpService, uri: String)
- extends BaseRequestHandler(service, uri) with Logging {
-
- override def handle(): String = {
- path match {
- case "/listPartitionLocationInfo" =>
- service.listPartitionLocationInfo
- case "/unavailablePeers" =>
- service.getUnavailablePeers
- case "/isShutdown" =>
- service.isShutdown
- case "/isRegistered" =>
- service.isRegistered
- case "/exit" =>
- service.exit(parameters.getOrElse("TYPE", ""))
- case _ => super.handle()
- }
- }
}
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index e212aa201..3fbcca5e5 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -20,8 +20,28 @@ package org.apache.celeborn.server.common.http
import java.net.URL
import java.util.Locale
+import org.apache.celeborn.server.common.{HttpService, Service}
+
object HttpUtils {
- def parseUrl(uri: String): (String, Map[String, String]) = {
+
+ private val baseEndpoints: List[HttpEndpoint] =
+ List(Conf, WorkerInfo, ThreadDump, Shuffles, ListTopDiskUsedApps, Help)
+ private val masterEndpoints: List[HttpEndpoint] = List(
+ MasterGroupInfo,
+ LostWorkers,
+ ExcludedWorkers,
+ ShutdownWorkers,
+ Hostnames,
+ Applications) ++ baseEndpoints
+ private val workerEndpoints: List[HttpEndpoint] =
+ List(
+ ListPartitionLocationInfo,
+ UnavailablePeers,
+ IsShutdown,
+ IsRegistered,
+ Exit) ++ baseEndpoints
+
+ def parseUri(uri: String): (String, Map[String, String]) = {
val url = new URL(s"https://127.0.0.1:9000$uri")
val parameter =
if (url.getQuery == null) {
@@ -34,4 +54,32 @@ object HttpUtils {
}
(url.getPath, parameter)
}
+
+ def handleRequest(
+ service: HttpService,
+ path: String,
+ parameters: Map[String, String]): String = {
+ endpoints(service.serviceName).find(endpoint => endpoint.path == path).orElse(
+ Some(Invalid)).get.handle(
+ service,
+ parameters)
+ }
+
+ def help(service: String): String = {
+ val sb = new StringBuilder
+ sb.append("Available API providers include:\n")
+ val httpEndpoints: List[HttpEndpoint] = endpoints(service)
+ val maxLength = httpEndpoints.map(_.path.length).max
+ httpEndpoints.sortBy(_.path).foreach(endpoint =>
+ sb.append(
+ s"${endpoint.path.padTo(maxLength, " ").mkString} ${endpoint.description(service)}\n"))
+ sb.toString
+ }
+
+ private def endpoints(service: String): List[HttpEndpoint] = {
+ if (service == Service.MASTER)
+ masterEndpoints
+ else
+ workerEndpoints
+ }
}
diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 01d6ae973..4133e67ea 100644
--- a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.server.common.http
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.Service
class HttpUtilsSuite extends AnyFunSuite with Logging {
@@ -27,7 +28,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
uri: String,
expectPath: String,
expectParameters: Map[String, String]): Unit = {
- val (path, parameters) = HttpUtils.parseUrl(uri)
+ val (path, parameters) = HttpUtils.parseUri(uri)
assert(path == expectPath)
assert(parameters == expectParameters)
}
@@ -40,4 +41,36 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
"/exit",
Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
}
+
+ test("CELEBORN-829: Improve response message of invalid HTTP request") {
+ assert(HttpUtils.help(Service.MASTER) ==
+ s"""Available API providers include:
+ |/applications List all running application's ids of the cluster.
+ |/conf List the conf setting of the master.
+ |/excludedWorkers List all excluded workers of the master.
+ |/help List the available API providers of the master.
+ |/hostnames List all running application's LifecycleManager's hostnames of the cluster.
+ |/listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
+ |/lostWorkers List all lost workers of the master.
+ |/masterGroupInfo List master group information of the service. It will list all master's LEADER, FOLLOWER information.
+ |/shuffles List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
+ |/shutdownWorkers List all shutdown workers of the master.
+ |/threadDump List the current thread dump of the master.
+ |/workerInfo List worker information of the service. It will list all registered workers 's information.
+ |""".stripMargin)
+ assert(HttpUtils.help(Service.WORKER) ==
+ s"""Available API providers include:
+ |/conf List the conf setting of the worker.
+ |/exit Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
+ |/help List the available API providers of the worker.
+ |/isRegistered Show if the worker is registered to the master success.
+ |/isShutdown Show if the worker is during the process of shutdown.
+ |/listPartitionLocationInfo List all the living PartitionLocation information in that worker.
+ |/listTopDiskUsedApps List the top disk usage application ids. It only return application ids running in that worker.
+ |/shuffles List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
+ |/threadDump List the current thread dump of the worker.
+ |/unavailablePeers List the unavailable peers of the worker, this always means the worker connect to the peer failed.
+ |/workerInfo List the worker information of the worker.
+ |""".stripMargin)
+ }
}