You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/03/10 07:59:02 UTC

[incubator-uniffle] branch master updated: [#80][Part-3] feat: add REST API for decommisson (#684)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b5bf103 [#80][Part-3] feat: add REST API for decommisson (#684)
4b5bf103 is described below

commit 4b5bf10379497b22a39914f1654e148bec66610a
Author: xianjingfeng <xi...@gmail.com>
AuthorDate: Fri Mar 10 15:58:55 2023 +0800

    [#80][Part-3] feat: add REST API for decommisson (#684)
    
    ### What changes were proposed in this pull request?
    
    Add REST API for decommisson
    
    ### Why are the changes needed?
    
    Support shuffle server decommission. It is a part of #80
    
    ### Does this PR introduce _any_ user-facing change?
    
    Env:
    
    * Server IP: 127.0.0.1
    * HTTP port: 19998
    * RPC port: 19999
    
    Decommission example:
    
    ```shell
    curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/decommission" -d '{"serverIds:": ["127.0.0.1:19999"]}'
    ```
    
    Cancel decommission example:
    
    ```shell
    curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/cancelDecommission" -d '{"serverIds:": ["127.0.0.1:19999"]}'
    ```
    
    Get server list:
    
    ```shell
    # path: /api/server/nodes[?id={serverId}][?status={serverStatus}]
    curl  "http://127.0.0.1:19998/api/server/nodes?status=DECOMMISSIONING"
    curl  "http://127.0.0.1:19998/api/server/nodes?status=ACTIVE"
    ```
    
    ### How was this patch tested?
    
    UT
---
 .../apache/uniffle/common/ServerStatusTest.java    |   2 +-
 .../apache/uniffle/common/metrics/TestUtils.java   |  37 +++--
 .../apache/uniffle/common/rpc/StatusCodeTest.java  |   2 +-
 coordinator/pom.xml                                |   1 +
 .../uniffle/coordinator/CoordinatorServer.java     |  17 +++
 .../apache/uniffle/coordinator/web/Response.java   |  71 ++++++++++
 .../web/request/CancelDecommissionRequest.java     |  29 ++--
 .../web/request/DecommissionRequest.java           |  29 ++--
 .../coordinator/web/servlet/BaseServlet.java       |  84 +++++++++++
 .../web/servlet/CancelDecommissionServlet.java     |  50 +++++++
 .../web/servlet/DecommissionServlet.java           |  50 +++++++
 .../coordinator/web/servlet/NodesServlet.java      |  56 ++++++++
 .../coordinator/metric/CoordinatorMetricsTest.java |   8 +-
 docs/coordinator_guide.md                          |  58 ++++++++
 .../java/org/apache/uniffle/test/ServletTest.java  | 157 +++++++++++++++++++++
 .../uniffle/server/ShuffleServerMetricsTest.java   |   8 +-
 .../apache/uniffle/server/ShuffleServerTest.java   |   6 +-
 17 files changed, 602 insertions(+), 63 deletions(-)

diff --git a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
index e449215a..2546cfa6 100644
--- a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
@@ -55,7 +55,7 @@ public class ServerStatusTest {
         fail(e.getMessage());
       }
     }
-    for (int i = 0; i < serverStatuses.size() - 1; i++) {
+    for (int i = 0; i < serverStatuses.size(); i++) {
       assertEquals(protoServerStatuses.get(i), serverStatuses.get(i).toProto());
       assertEquals(ServerStatus.fromProto(protoServerStatuses.get(i)), serverStatuses.get(i));
     }
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
index 1233f9b5..ea623ea5 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.metrics;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 
@@ -28,18 +29,38 @@ public class TestUtils {
   private TestUtils() {
   }
 
-  public static String httpGetMetrics(String urlString) throws IOException {
+  public static String httpGet(String urlString) throws IOException {
     URL url = new URL(urlString);
     HttpURLConnection con = (HttpURLConnection) url.openConnection();
     con.setRequestMethod("GET");
-    BufferedReader in = new BufferedReader(
-        new InputStreamReader(con.getInputStream()));
-    String inputLine;
-    StringBuffer content = new StringBuffer();
-    while ((inputLine = in.readLine()) != null) {
-      content.append(inputLine);
+    StringBuilder content = new StringBuilder();
+    try (BufferedReader in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));) {
+      String inputLine;
+      while ((inputLine = in.readLine()) != null) {
+        content.append(inputLine);
+      }
     }
-    in.close();
+    return content.toString();
+  }
+
+  public static String httpPost(String urlString, String postData) throws IOException {
+    URL url = new URL(urlString);
+    HttpURLConnection con = (HttpURLConnection) url.openConnection();
+    con.setDoOutput(true);
+    con.setRequestMethod("POST");
+    StringBuilder content = new StringBuilder();
+    try (OutputStream outputStream = con.getOutputStream();) {
+      outputStream.write(postData.getBytes());
+      try (BufferedReader in = new BufferedReader(
+          new InputStreamReader(con.getInputStream()));) {
+        String inputLine;
+        while ((inputLine = in.readLine()) != null) {
+          content.append(inputLine);
+        }
+      }
+    }
+
     return content.toString();
   }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
index 5956c1d3..4e9c5b2f 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
@@ -55,7 +55,7 @@ public class StatusCodeTest {
         fail(e.getMessage());
       }
     }
-    for (int i = 0; i < statusCodes.size() - 1; i++) {
+    for (int i = 0; i < statusCodes.size(); i++) {
       assertEquals(protoStatusCode.get(i), statusCodes.get(i).toProto());
       assertEquals(StatusCode.fromProto(protoStatusCode.get(i)), statusCodes.get(i));
     }
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index ac6b2ec5..df147d3f 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -107,6 +107,7 @@
                 <includes>
                   <include>com.google.protobuf:protobuf-java-util</include>
                   <include>com.google.guava:guava</include>
+                  <include>com.google.guava:failureaccess</include>
                   <include>com.fasterxml.jackson.core:jackson-databind</include>
                   <include>com.fasterxml.jackson.core:jackson-core</include>
                 </includes>
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 0ac8f246..686f0596 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -41,6 +41,9 @@ import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
+import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet;
+import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet;
+import org.apache.uniffle.coordinator.web.servlet.NodesServlet;
 
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -150,6 +153,7 @@ public class CoordinatorServer extends ReconfigurableBase {
     id = ip + "-" + port;
     LOG.info("Start to initialize coordinator {}", id);
     jettyServer = new JettyServer(coordinatorConf);
+    registerRESTAPI();
     // register metrics first to avoid NPE problem when add dynamic metrics
     registerMetrics();
     coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
@@ -182,6 +186,19 @@ public class CoordinatorServer extends ReconfigurableBase {
     server = coordinatorFactory.getServer();
   }
 
+  private void registerRESTAPI() throws Exception {
+    LOG.info("Register REST API");
+    jettyServer.addServlet(
+        new NodesServlet(this),
+        "/api/server/nodes");
+    jettyServer.addServlet(
+        new DecommissionServlet(this),
+        "/api/server/decommission");
+    jettyServer.addServlet(
+        new CancelDecommissionServlet(this),
+        "/api/server/cancelDecommission");
+  }
+
   private void registerMetrics() throws Exception {
     LOG.info("Register metrics");
     CollectorRegistry coordinatorCollectorRegistry = new CollectorRegistry(true);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java
new file mode 100644
index 00000000..b883d55f
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web;
+
+public class Response<T> {
+  private static final int SUCCESS_CODE = 0;
+  private static final int ERROR_CODE = -1;
+  private int code;
+  private T data;
+  private String errMsg;
+
+  public Response() {
+  }
+
+  public Response(int code, T data, String errMsg) {
+    this.code = code;
+    this.data = data;
+    this.errMsg = errMsg;
+  }
+
+  public static <T> Response<T> success(T data) {
+    return new Response<>(SUCCESS_CODE, data, null);
+  }
+
+  public static <T> Response<T> fail(String msg) {
+    return new Response<>(ERROR_CODE, null, msg);
+  }
+
+  public static <T> Response<T> fail(String msg, int code) {
+    return new Response<>(code, null, msg);
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public void setCode(int code) {
+    this.code = code;
+  }
+
+  public T getData() {
+    return data;
+  }
+
+  public void setData(T data) {
+    this.data = data;
+  }
+
+  public String getErrMsg() {
+    return errMsg;
+  }
+
+  public void setErrMsg(String errMsg) {
+    this.errMsg = errMsg;
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
copy to coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
index 1233f9b5..997a135e 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
@@ -15,31 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.metrics;
+package org.apache.uniffle.coordinator.web.request;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.util.Set;
 
-public class TestUtils {
+public class CancelDecommissionRequest {
+  private Set<String> serverIds;
 
-  private TestUtils() {
+  public Set<String> getServerIds() {
+    return serverIds;
   }
 
-  public static String httpGetMetrics(String urlString) throws IOException {
-    URL url = new URL(urlString);
-    HttpURLConnection con = (HttpURLConnection) url.openConnection();
-    con.setRequestMethod("GET");
-    BufferedReader in = new BufferedReader(
-        new InputStreamReader(con.getInputStream()));
-    String inputLine;
-    StringBuffer content = new StringBuffer();
-    while ((inputLine = in.readLine()) != null) {
-      content.append(inputLine);
-    }
-    in.close();
-    return content.toString();
+  public void setServerIds(Set<String> serverIds) {
+    this.serverIds = serverIds;
   }
 }
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
copy to coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
index 1233f9b5..c11a716c 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
@@ -15,31 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.metrics;
+package org.apache.uniffle.coordinator.web.request;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.util.Set;
 
-public class TestUtils {
+public class DecommissionRequest {
+  private Set<String> serverIds;
 
-  private TestUtils() {
+  public Set<String> getServerIds() {
+    return serverIds;
   }
 
-  public static String httpGetMetrics(String urlString) throws IOException {
-    URL url = new URL(urlString);
-    HttpURLConnection con = (HttpURLConnection) url.openConnection();
-    con.setRequestMethod("GET");
-    BufferedReader in = new BufferedReader(
-        new InputStreamReader(con.getInputStream()));
-    String inputLine;
-    StringBuffer content = new StringBuffer();
-    while ((inputLine = in.readLine()) != null) {
-      content.append(inputLine);
-    }
-    in.close();
-    return content.toString();
+  public void setServerIds(Set<String> serverIds) {
+    this.serverIds = serverIds;
   }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
new file mode 100644
index 00000000..a67701f2
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.uniffle.coordinator.web.Response;
+
+public abstract class BaseServlet extends HttpServlet {
+  public static final String JSON_MIME_TYPE = "application/json";
+  final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+    writeJSON(resp, handlerRequest(() -> handleGet(req, resp)));
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+    writeJSON(resp, handlerRequest(() -> handlePost(req, resp)));
+  }
+
+  private Response handlerRequest(
+      Callable<Response> function) {
+    Response response;
+    try {
+      // todo: Do something for authentication
+      response = function.call();
+    } catch (Exception e) {
+      response = Response.fail(e.getMessage());
+    }
+    return response;
+  }
+
+  protected Response handleGet(
+      HttpServletRequest req,
+      HttpServletResponse resp) throws ServletException, IOException {
+    throw new IOException("Method not support!");
+  }
+
+  protected Response handlePost(
+      HttpServletRequest req,
+      HttpServletResponse resp) throws ServletException, IOException {
+    throw new IOException("Method not support!");
+  }
+
+  protected void writeJSON(final HttpServletResponse resp, final Object obj)
+      throws IOException {
+    if (obj == null) {
+      return;
+    }
+    resp.setContentType(JSON_MIME_TYPE);
+    final OutputStream stream = resp.getOutputStream();
+    mapper.writeValue(stream, obj);
+  }
+
+  protected <T> T parseParamsFromJson(HttpServletRequest req, Class<T> clazz) throws IOException {
+    return mapper.readValue(req.getInputStream(), clazz);
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
new file mode 100644
index 00000000..b7411d4e
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
+
+public class CancelDecommissionServlet extends BaseServlet {
+  private final CoordinatorServer coordinator;
+
+  public CancelDecommissionServlet(CoordinatorServer coordinator) {
+    this.coordinator = coordinator;
+  }
+
+  @Override
+  protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+    CancelDecommissionRequest params = parseParamsFromJson(req, CancelDecommissionRequest.class);
+    if (CollectionUtils.isEmpty(params.getServerIds())) {
+      return Response.fail("Parameter[serverIds] should not be null!");
+    }
+    ClusterManager clusterManager = coordinator.getClusterManager();
+    params.getServerIds().forEach((serverId) -> {
+      clusterManager.cancelDecommission(serverId);
+    });
+    return Response.success(null);
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
new file mode 100644
index 00000000..96f06dd3
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
+
+public class DecommissionServlet extends BaseServlet {
+  private final CoordinatorServer coordinator;
+
+  public DecommissionServlet(CoordinatorServer coordinator) {
+    this.coordinator = coordinator;
+  }
+
+  @Override
+  protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+    DecommissionRequest params = parseParamsFromJson(req, DecommissionRequest.class);
+    if (CollectionUtils.isEmpty(params.getServerIds())) {
+      return Response.fail("Parameter[serverIds] should not be null!");
+    }
+    ClusterManager clusterManager = coordinator.getClusterManager();
+    params.getServerIds().forEach((serverId) -> {
+      clusterManager.decommission(serverId);
+    });
+    return Response.success(null);
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
new file mode 100644
index 00000000..04850235
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.web.Response;
+
+
+public class NodesServlet extends BaseServlet {
+  private final CoordinatorServer coordinator;
+
+  public NodesServlet(CoordinatorServer coordinator) {
+    this.coordinator = coordinator;
+  }
+
+  @Override
+  protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) {
+    List<ServerNode> serverList = coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+    String id = req.getParameter("id");
+    String status = req.getParameter("status");
+    serverList = serverList.stream().filter((server) -> {
+      if (id != null && !id.equals(server.getId())) {
+        return false;
+      }
+      if (status != null && !server.getStatus().toString().equals(status)) {
+        return false;
+      }
+      return true;
+    }).collect(Collectors.toList());
+    Collections.sort(serverList, Comparator.comparing(ServerNode::getId));
+    return Response.success(serverList);
+  }
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index 945cafda..454e6bb8 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -68,7 +68,7 @@ public class CoordinatorMetricsTest {
 
   @Test
   public void testDynamicMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+    String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode metricsNode = mapper.readTree(content).get("metrics");
     String remoteStorageMetricsName = CoordinatorMetrics.REMOTE_STORAGE_IN_USED_PREFIX + "path1";
@@ -85,7 +85,7 @@ public class CoordinatorMetricsTest {
 
   @Test
   public void testCoordinatorMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+    String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
@@ -101,7 +101,7 @@ public class CoordinatorMetricsTest {
 
   @Test
   public void testJvmMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_JVM_URL);
+    String content = TestUtils.httpGet(SERVER_JVM_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
@@ -109,7 +109,7 @@ public class CoordinatorMetricsTest {
 
   @Test
   public void testGrpcMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL);
+    String content = TestUtils.httpGet(SERVER_GRPC_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index 09593f55..d3815a04 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -136,3 +136,61 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi
 |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
 |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed.                                                                                                                                                                                                                                                                                      |
 |rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval in seconds for the reporter to report metrics.                                                                                                                                                                                                                                                                                     |
+
+## RESTful API(beta)
+
+### Fetch Shuffle servers
+
+<details>
+ <summary><code>GET</code> <code><b>/api/server/nodes</b></code> </summary>
+
+##### Parameters
+
+> |name|type|data type|description|
+> |----|----|---------|-----------|
+> |id|required|string|shuffle server id, eg:127.0.0.1:19999|
+> |status|optional|string|Shuffle server status, eg:ACTIVE, DECOMMISSIONING, DECOMMISSIONED|
+
+##### Example cURL
+
+> ```bash
+>  curl -X GET http://localhost:19998/api/server/nodes
+> ```
+</details>
+
+### Decommission shuffle servers
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/decommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type         |description|
+> |----|-------------------|---------|-----------|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+> 
+##### Example cURL
+
+> ```bash
+>  curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/decommission  -d '{"serverIds:": ["127.0.0.1:19999"]}'
+> ```
+</details>
+
+
+### Cancel decommission shuffle servers
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/cancelDecommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type         |description|
+> |----|-------------------|---------|-----------|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+>
+##### Example cURL
+
+> ```bash
+>  curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/cancelDecommission  -d '{"serverIds:": ["127.0.0.1:19999"]}'
+> ```
+</details>
\ No newline at end of file
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
new file mode 100644
index 00000000..1963195e
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
+import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ServletTest extends IntegrationTestBase {
+  private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";
+  private static final String NODES_URL = URL_PREFIX + "server/nodes";
+  private static final String DECOMMISSION_URL = URL_PREFIX + "server/decommission";
+  private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + "server/cancelDecommission";
+  private static CoordinatorServer coordinatorServer;
+  private ObjectMapper objectMapper = new ObjectMapper();
+
+  @BeforeAll
+  public static void setUp(@TempDir File tmpDir) throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, 12345);
+    coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128);
+    coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346);
+    createCoordinatorServer(coordinatorConf);
+
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM, "127.0.0.1:12346");
+    File dataDir1 = new File(tmpDir, "data1");
+    File dataDir2 = new File(tmpDir, "data2");
+    List<String> basePath = Lists.newArrayList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath());
+    shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+    createShuffleServer(shuffleServerConf);
+    File dataDir3 = new File(tmpDir, "data3");
+    File dataDir4 = new File(tmpDir, "data4");
+    basePath = Lists.newArrayList(dataDir3.getAbsolutePath(), dataDir4.getAbsolutePath());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+    shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 1);
+    shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18081);
+    createShuffleServer(shuffleServerConf);
+    startServers();
+    coordinatorServer = coordinators.get(0);
+    Awaitility.await().timeout(30, TimeUnit.SECONDS).until(() ->
+        coordinatorServer.getClusterManager().list().size() == 2);
+  }
+
+  @Test
+  public void testNodesServlet() throws Exception {
+    String content = TestUtils.httpGet(NODES_URL);
+    Response<List<HashMap>> response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
+    });
+    List<HashMap> serverList = response.getData();
+    assertEquals(0, response.getCode());
+    assertEquals(2, serverList.size());
+    assertEquals(SHUFFLE_SERVER_PORT, Integer.parseInt(serverList.get(0).get("port").toString()));
+    assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(0).get("status"));
+    assertEquals(SHUFFLE_SERVER_PORT + 1, Integer.parseInt(serverList.get(1).get("port").toString()));
+    assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(1).get("status"));
+
+    // Only fetch one server.
+    ShuffleServer shuffleServer = shuffleServers.get(0);
+    content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId());
+    response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
+    });
+    serverList = response.getData();
+    assertEquals(1, serverList.size());
+    assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
+
+    content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED");
+    response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+    serverList = response.getData();
+    assertEquals(0, serverList.size());
+    content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE");
+    response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+    serverList = response.getData();
+    assertEquals(2, serverList.size());
+  }
+
+  @Test
+  public void testDecommissionServlet() throws Exception {
+    ShuffleServer shuffleServer = shuffleServers.get(0);
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+    DecommissionRequest decommissionRequest = new DecommissionRequest();
+    decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
+    String content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest));
+    Response response = objectMapper.readValue(content, Response.class);
+    assertEquals(-1, response.getCode());
+    assertNotNull(response.getErrMsg());
+    CancelDecommissionRequest cancelDecommissionRequest = new CancelDecommissionRequest();
+    cancelDecommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
+    content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+
+    // Register shuffle, avoid server exiting immediately.
+    ShuffleServerGrpcClient shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+    shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("testDecommissionServlet_appId", 0,
+        Lists.newArrayList(new PartitionRange(0, 1)), ""));
+    decommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
+    content = TestUtils.httpPost(DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+    assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+
+    // Wait until shuffle server send heartbeat to coordinator.
+    Awaitility.await().timeout(10, TimeUnit.SECONDS).until(() ->
+        ServerStatus.DECOMMISSIONING.equals(
+            coordinatorServer.getClusterManager().getServerNodeById(shuffleServer.getId()).getStatus()));
+    // Cancel decommission.
+    content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest));
+    response = objectMapper.readValue(content, Response.class);
+    assertEquals(0, response.getCode());
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+  }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index b995a90f..99993eb4 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -77,7 +77,7 @@ public class ShuffleServerMetricsTest {
 
   @Test
   public void testJvmMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_JVM_URL);
+    String content = TestUtils.httpGet(SERVER_JVM_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
@@ -85,7 +85,7 @@ public class ShuffleServerMetricsTest {
 
   @Test
   public void testServerMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+    String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
@@ -145,7 +145,7 @@ public class ShuffleServerMetricsTest {
 
   @Test
   public void testGrpcMetrics() throws Exception {
-    String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL);
+    String content = TestUtils.httpGet(SERVER_GRPC_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
@@ -187,7 +187,7 @@ public class ShuffleServerMetricsTest {
       f.get();
     }
 
-    String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+    String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 19c9513e..6dc5e56e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -50,7 +50,6 @@ public class ShuffleServerTest {
       ShuffleServerConf serverConf = createShuffleServerConf();
       ShuffleServer ss1 = new ShuffleServer(serverConf);
       ss1.start();
-      ss1.stopServer();
       ExitUtils.disableSystemExit();
       ShuffleServer ss2 = new ShuffleServer(serverConf);
       String expectMessage = "Fail to start jetty http server";
@@ -61,7 +60,6 @@ public class ShuffleServerTest {
         assertEquals(expectMessage, e.getMessage());
         assertEquals(expectStatus, ((ExitException) e).getStatus());
       }
-      ss2.stopServer();
 
       serverConf.setInteger("rss.jetty.http.port", 9529);
       ss2 = new ShuffleServer(serverConf);
@@ -72,7 +70,7 @@ public class ShuffleServerTest {
         assertEquals(expectMessage, e.getMessage());
         assertEquals(expectStatus, ((ExitException) e).getStatus());
       }
-      ss2.stopServer();
+      ss1.stopServer();
 
       final Thread t = new Thread(null, () -> {
         throw new AssertionError("TestUncaughtException");
@@ -92,6 +90,8 @@ public class ShuffleServerTest {
     ShuffleServerConf serverConf = createShuffleServerConf();
     serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
     serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+    serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19527);
+    serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19528);
     ShuffleServer shuffleServer = new ShuffleServer(serverConf);
     shuffleServer.start();
     assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());