You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/10/16 08:38:00 UTC

[incubator-celeborn] branch main updated: [CELEBORN-829] Improve response message of invalid HTTP request

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f2d6cc752 [CELEBORN-829] Improve response message of invalid HTTP request
f2d6cc752 is described below

commit f2d6cc7525d309b6dd741e661ac0abce7c671a29
Author: SteNicholas <pr...@163.com>
AuthorDate: Mon Oct 16 16:37:51 2023 +0800

    [CELEBORN-829] Improve response message of invalid HTTP request
    
    ### What changes were proposed in this pull request?
    
    Improve response message of invalid HTTP request, which lists available API providers like as below:
    
    - master
    
    ```
    Invalid uri of the master. Available API providers include:
    /applications        List all running application's ids of the cluster.
    /conf                List the conf setting of the master.
    /excludedWorkers     List all excluded workers of the master.
    /help                List the available API providers of the master.
    /hostnames           List all running application's LifecycleManager's hostnames of the cluster.
    /listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
    /lostWorkers         List all lost workers of the master.
    /masterGroupInfo     List master group information of the service. It will list all master's LEADER, FOLLOWER information.
    /shuffles            List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
    /shutdownWorkers     List all shutdown workers of the master.
    /threadDump          List the current thread dump of the master.
    /workerInfo          List worker information of the service. It will list all registered workers 's information.
    ```
    
    - worker
    
    ```
    Invalid uri of the worker. Available API providers include:
    /conf                      List the conf setting of the worker.
    /exit                      Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
    /help                      List the available API providers of the worker.
    /isRegistered              Show if the worker is registered to the master success.
    /isShutdown                Show if the worker is during the process of shutdown.
    /listPartitionLocationInfo List all the living PartitionLocation information in that worker.
    /listTopDiskUsedApps       List the top disk usage application ids. It only return application ids running in that worker.
    /shuffles                  List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
    /threadDump                List the current thread dump of the worker.
    /unavailablePeers          List the unavailable peers of the worker, this always means the worker connect to the peer failed.
    /workerInfo                List the worker information of the worker.
    ```
    
    ### Why are the changes needed?
    
    Response message of invalid HTTP request could not help users with correct HTTP path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `HttpUtilsSuite#CELEBORN-829: Improve response message of invalid HTTP request`
    
    Closes #1986 from SteNicholas/CELEBORN-829.
    
    Authored-by: SteNicholas <pr...@163.com>
    Signed-off-by: zky.zhoukeyong <zk...@alibaba-inc.com>
---
 docs/monitoring.md                                 |   2 +
 .../celeborn/server/common/http/HttpEndpoint.scala | 219 +++++++++++++++++++++
 .../server/common/http/HttpRequestHandler.scala    |  98 +--------
 .../celeborn/server/common/http/HttpUtils.scala    |  50 ++++-
 .../server/common/http/HttpUtilsSuite.scala        |  35 +++-
 5 files changed, 309 insertions(+), 95 deletions(-)

diff --git a/docs/monitoring.md b/docs/monitoring.md
index cc2dc819f..54dfcf1ec 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -301,6 +301,7 @@ API path listed as below:
 | /applications         | List all running application's ids of the cluster.                                                                                  |
 | /shuffles             | List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.                              |
 | /listTopDiskUsedApps  | List the top disk usage application ids. It will return the top disk usage application ids for the cluster.                         |
+| /help                 | List the available API providers of the master.                                                                                     |
 
 #### Worker
 
@@ -317,3 +318,4 @@ API path listed as below:
 | /isShutdown                | Show if the worker is during the process of shutdown.                                                                               |
 | /isRegistered              | Show if the worker is registered to the master success.                                                                             |
 | /exit?type=${TYPE}         | Trigger this worker to exit. Legal `type`s are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'                                         |
+| /help                      | List the available API providers of the worker.                                                                                     |
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
new file mode 100644
index 000000000..0cbc3b975
--- /dev/null
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import org.apache.celeborn.server.common.{HttpService, Service}
+
+/**
+ * HTTP endpoints of Rest API providers.
+ */
+trait HttpEndpoint {
+  def path: String
+
+  def description(service: String): String
+
+  def handle(service: HttpService, parameters: Map[String, String]): String
+}
+
+case object Conf extends HttpEndpoint {
+  override def path: String = "/conf"
+
+  override def description(service: String): String = s"List the conf setting of the $service."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getConf
+}
+
+case object WorkerInfo extends HttpEndpoint {
+  override def path: String = "/workerInfo"
+
+  override def description(service: String): String = {
+    if (service == Service.MASTER)
+      "List worker information of the service. It will list all registered workers 's information."
+    else "List the worker information of the worker."
+  }
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getWorkerInfo
+}
+
+case object ThreadDump extends HttpEndpoint {
+  override def path: String = "/threadDump"
+
+  override def description(service: String): String =
+    s"List the current thread dump of the $service."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getThreadDump
+}
+
+case object Shuffles extends HttpEndpoint {
+  override def path: String = "/shuffles"
+
+  override def description(service: String): String = {
+    if (service == Service.MASTER)
+      "List all running shuffle keys of the service. It will return all running shuffle's key of the cluster."
+    else
+      "List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker."
+  }
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getShuffleList
+}
+
+case object ListTopDiskUsedApps extends HttpEndpoint {
+  override def path: String = "/listTopDiskUsedApps"
+
+  override def description(service: String): String = {
+    if (service == Service.MASTER)
+      "List the top disk usage application ids. It will return the top disk usage application ids for the cluster."
+    else
+      "List the top disk usage application ids. It only return application ids running in that worker."
+  }
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.listTopDiskUseApps
+}
+
+case object Help extends HttpEndpoint {
+  override def path: String = "/help"
+
+  override def description(service: String): String =
+    s"List the available API providers of the $service."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    HttpUtils.help(service.serviceName)
+}
+
+case object Invalid extends HttpEndpoint {
+
+  val invalid = "invalid"
+
+  override def path: String = None.toString
+
+  override def description(service: String): String = s"Invalid uri of the $service."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String = invalid
+}
+
+case object MasterGroupInfo extends HttpEndpoint {
+  override def path: String = "/masterGroupInfo"
+
+  override def description(service: String): String =
+    "List master group information of the service. It will list all master's LEADER, FOLLOWER information."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getMasterGroupInfo
+}
+
+case object LostWorkers extends HttpEndpoint {
+  override def path: String = "/lostWorkers"
+
+  override def description(service: String): String = "List all lost workers of the master."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getLostWorkers
+}
+
+case object ExcludedWorkers extends HttpEndpoint {
+  override def path: String = "/excludedWorkers"
+
+  override def description(service: String): String = "List all excluded workers of the master."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getExcludedWorkers
+}
+
+case object ShutdownWorkers extends HttpEndpoint {
+  override def path: String = "/shutdownWorkers"
+
+  override def description(service: String): String = "List all shutdown workers of the master."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getShutdownWorkers
+}
+
+case object Hostnames extends HttpEndpoint {
+  override def path: String = "/hostnames"
+
+  override def description(service: String): String =
+    "List all running application's LifecycleManager's hostnames of the cluster."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getHostnameList
+}
+
+case object Applications extends HttpEndpoint {
+  override def path: String = "/applications"
+
+  override def description(service: String): String =
+    "List all running application's ids of the cluster."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getApplicationList
+}
+
+case object ListPartitionLocationInfo extends HttpEndpoint {
+  override def path: String = "/listPartitionLocationInfo"
+
+  override def description(service: String): String =
+    "List all the living PartitionLocation information in that worker."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.listPartitionLocationInfo
+}
+
+case object UnavailablePeers extends HttpEndpoint {
+  override def path: String = "/unavailablePeers"
+
+  override def description(service: String): String =
+    "List the unavailable peers of the worker, this always means the worker connect to the peer failed."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.getUnavailablePeers
+}
+
+case object IsShutdown extends HttpEndpoint {
+  override def path: String = "/isShutdown"
+
+  override def description(service: String): String =
+    "Show if the worker is during the process of shutdown."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.isShutdown
+}
+
+case object IsRegistered extends HttpEndpoint {
+  override def path: String = "/isRegistered"
+
+  override def description(service: String): String =
+    "Show if the worker is registered to the master success."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.isRegistered
+}
+
+case object Exit extends HttpEndpoint {
+  override def path: String = "/exit"
+
+  override def description(service: String): String =
+    "Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'"
+
+  override def handle(service: HttpService, parameters: Map[String, String]): String =
+    service.exit(parameters.getOrElse("TYPE", ""))
+}
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index d574947ed..86409c6bd 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -25,7 +25,7 @@ import io.netty.util.CharsetUtil
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.sink.PrometheusHttpRequestHandler
-import org.apache.celeborn.server.common.{HttpService, Service}
+import org.apache.celeborn.server.common.HttpService
 
 /**
  * A handler for the REST API that defines how to handle the HTTP request given a message.
@@ -45,13 +45,14 @@ class HttpRequestHandler(
 
   override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
     val uri = req.uri()
-    val msg = handleRequest(uri)
+    val (path, parameters) = HttpUtils.parseUri(uri)
+    val msg = HttpUtils.handleRequest(service, path, parameters)
     val response = msg match {
-      case "invalid" =>
+      case Invalid.invalid =>
         if (prometheusHttpRequestHandler != null) {
           prometheusHttpRequestHandler.handleRequest(uri)
         } else {
-          s"invalid uri $uri"
+          s"${Invalid.description(service.serviceName)} ${HttpUtils.help(service.serviceName)}"
         }
       case _ => msg
     }
@@ -63,93 +64,4 @@ class HttpRequestHandler(
     res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8")
     ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE)
   }
-
-  private def handleRequest(uri: String): String = {
-    if (service.serviceName == Service.MASTER) {
-      new MasterRequestHandler(service, uri).handle()
-    } else {
-      new WorkerRequestHandler(service, uri).handle()
-    }
-  }
-}
-
-/**
- * A basic handler for the REST API that defines how to handle the HTTP request.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request.
- */
-class BaseRequestHandler(service: HttpService, uri: String) extends Logging {
-
-  val (path, parameters) = HttpUtils.parseUrl(uri)
-
-  def handle(): String = {
-    path match {
-      case "/conf" =>
-        service.getConf
-      case "/workerInfo" =>
-        service.getWorkerInfo
-      case "/threadDump" =>
-        service.getThreadDump
-      case "/shuffles" =>
-        service.getShuffleList
-      case "/listTopDiskUsedApps" =>
-        service.listTopDiskUseApps
-      case _ => "invalid"
-    }
-  }
-}
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request from master.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request from master.
- */
-class MasterRequestHandler(service: HttpService, uri: String)
-  extends BaseRequestHandler(service, uri) with Logging {
-
-  override def handle(): String = {
-    path match {
-      case "/masterGroupInfo" =>
-        service.getMasterGroupInfo
-      case "/lostWorkers" =>
-        service.getLostWorkers
-      case "/excludedWorkers" =>
-        service.getExcludedWorkers
-      case "/shutdownWorkers" =>
-        service.getShutdownWorkers
-      case "/hostnames" =>
-        service.getHostnameList
-      case "/applications" =>
-        service.getApplicationList
-      case _ => super.handle()
-    }
-  }
-}
-
-/**
- * A handler for the REST API that defines how to handle the HTTP request from worker.
- *
- * @param service The service of HTTP server.
- * @param uri The uri of HTTP request from worker.
- */
-class WorkerRequestHandler(service: HttpService, uri: String)
-  extends BaseRequestHandler(service, uri) with Logging {
-
-  override def handle(): String = {
-    path match {
-      case "/listPartitionLocationInfo" =>
-        service.listPartitionLocationInfo
-      case "/unavailablePeers" =>
-        service.getUnavailablePeers
-      case "/isShutdown" =>
-        service.isShutdown
-      case "/isRegistered" =>
-        service.isRegistered
-      case "/exit" =>
-        service.exit(parameters.getOrElse("TYPE", ""))
-      case _ => super.handle()
-    }
-  }
 }
diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index e212aa201..3fbcca5e5 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -20,8 +20,28 @@ package org.apache.celeborn.server.common.http
 import java.net.URL
 import java.util.Locale
 
+import org.apache.celeborn.server.common.{HttpService, Service}
+
 object HttpUtils {
-  def parseUrl(uri: String): (String, Map[String, String]) = {
+
+  private val baseEndpoints: List[HttpEndpoint] =
+    List(Conf, WorkerInfo, ThreadDump, Shuffles, ListTopDiskUsedApps, Help)
+  private val masterEndpoints: List[HttpEndpoint] = List(
+    MasterGroupInfo,
+    LostWorkers,
+    ExcludedWorkers,
+    ShutdownWorkers,
+    Hostnames,
+    Applications) ++ baseEndpoints
+  private val workerEndpoints: List[HttpEndpoint] =
+    List(
+      ListPartitionLocationInfo,
+      UnavailablePeers,
+      IsShutdown,
+      IsRegistered,
+      Exit) ++ baseEndpoints
+
+  def parseUri(uri: String): (String, Map[String, String]) = {
     val url = new URL(s"https://127.0.0.1:9000$uri")
     val parameter =
       if (url.getQuery == null) {
@@ -34,4 +54,32 @@ object HttpUtils {
       }
     (url.getPath, parameter)
   }
+
+  def handleRequest(
+      service: HttpService,
+      path: String,
+      parameters: Map[String, String]): String = {
+    endpoints(service.serviceName).find(endpoint => endpoint.path == path).orElse(
+      Some(Invalid)).get.handle(
+      service,
+      parameters)
+  }
+
+  def help(service: String): String = {
+    val sb = new StringBuilder
+    sb.append("Available API providers include:\n")
+    val httpEndpoints: List[HttpEndpoint] = endpoints(service)
+    val maxLength = httpEndpoints.map(_.path.length).max
+    httpEndpoints.sortBy(_.path).foreach(endpoint =>
+      sb.append(
+        s"${endpoint.path.padTo(maxLength, " ").mkString} ${endpoint.description(service)}\n"))
+    sb.toString
+  }
+
+  private def endpoints(service: String): List[HttpEndpoint] = {
+    if (service == Service.MASTER)
+      masterEndpoints
+    else
+      workerEndpoints
+  }
 }
diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 01d6ae973..4133e67ea 100644
--- a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.server.common.http
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.server.common.Service
 
 class HttpUtilsSuite extends AnyFunSuite with Logging {
 
@@ -27,7 +28,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
       uri: String,
       expectPath: String,
       expectParameters: Map[String, String]): Unit = {
-    val (path, parameters) = HttpUtils.parseUrl(uri)
+    val (path, parameters) = HttpUtils.parseUri(uri)
     assert(path == expectPath)
     assert(parameters == expectParameters)
   }
@@ -40,4 +41,36 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
       "/exit",
       Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
   }
+
+  test("CELEBORN-829: Improve response message of invalid HTTP request") {
+    assert(HttpUtils.help(Service.MASTER) ==
+      s"""Available API providers include:
+         |/applications        List all running application's ids of the cluster.
+         |/conf                List the conf setting of the master.
+         |/excludedWorkers     List all excluded workers of the master.
+         |/help                List the available API providers of the master.
+         |/hostnames           List all running application's LifecycleManager's hostnames of the cluster.
+         |/listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
+         |/lostWorkers         List all lost workers of the master.
+         |/masterGroupInfo     List master group information of the service. It will list all master's LEADER, FOLLOWER information.
+         |/shuffles            List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
+         |/shutdownWorkers     List all shutdown workers of the master.
+         |/threadDump          List the current thread dump of the master.
+         |/workerInfo          List worker information of the service. It will list all registered workers 's information.
+         |""".stripMargin)
+    assert(HttpUtils.help(Service.WORKER) ==
+      s"""Available API providers include:
+         |/conf                      List the conf setting of the worker.
+         |/exit                      Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
+         |/help                      List the available API providers of the worker.
+         |/isRegistered              Show if the worker is registered to the master success.
+         |/isShutdown                Show if the worker is during the process of shutdown.
+         |/listPartitionLocationInfo List all the living PartitionLocation information in that worker.
+         |/listTopDiskUsedApps       List the top disk usage application ids. It only return application ids running in that worker.
+         |/shuffles                  List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
+         |/threadDump                List the current thread dump of the worker.
+         |/unavailablePeers          List the unavailable peers of the worker, this always means the worker connect to the peer failed.
+         |/workerInfo                List the worker information of the worker.
+         |""".stripMargin)
+  }
 }