You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/03/10 07:59:02 UTC
[incubator-uniffle] branch master updated: [#80][Part-3] feat: add REST API for decommisson (#684)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4b5bf103 [#80][Part-3] feat: add REST API for decommisson (#684)
4b5bf103 is described below
commit 4b5bf10379497b22a39914f1654e148bec66610a
Author: xianjingfeng <xi...@gmail.com>
AuthorDate: Fri Mar 10 15:58:55 2023 +0800
[#80][Part-3] feat: add REST API for decommisson (#684)
### What changes were proposed in this pull request?
Add REST API for decommisson
### Why are the changes needed?
Support shuffle server decommission. It is a part of #80
### Does this PR introduce _any_ user-facing change?
Env:
* Server IP: 127.0.0.1
* HTTP port: 19998
* RPC port: 19999
Decommission example:
```shell
curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/decommission" -d '{"serverIds:": ["127.0.0.1:19999"]}'
```
Cancel decommission example:
```shell
curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/cancelDecommission" -d '{"serverIds:": ["127.0.0.1:19999"]}'
```
Get server list:
```shell
# path: /api/server/nodes[?id={serverId}][?status={serverStatus}]
curl "http://127.0.0.1:19998/api/server/nodes?status=DECOMMISSIONING"
curl "http://127.0.0.1:19998/api/server/nodes?status=ACTIVE"
```
### How was this patch tested?
UT
---
.../apache/uniffle/common/ServerStatusTest.java | 2 +-
.../apache/uniffle/common/metrics/TestUtils.java | 37 +++--
.../apache/uniffle/common/rpc/StatusCodeTest.java | 2 +-
coordinator/pom.xml | 1 +
.../uniffle/coordinator/CoordinatorServer.java | 17 +++
.../apache/uniffle/coordinator/web/Response.java | 71 ++++++++++
.../web/request/CancelDecommissionRequest.java | 29 ++--
.../web/request/DecommissionRequest.java | 29 ++--
.../coordinator/web/servlet/BaseServlet.java | 84 +++++++++++
.../web/servlet/CancelDecommissionServlet.java | 50 +++++++
.../web/servlet/DecommissionServlet.java | 50 +++++++
.../coordinator/web/servlet/NodesServlet.java | 56 ++++++++
.../coordinator/metric/CoordinatorMetricsTest.java | 8 +-
docs/coordinator_guide.md | 58 ++++++++
.../java/org/apache/uniffle/test/ServletTest.java | 157 +++++++++++++++++++++
.../uniffle/server/ShuffleServerMetricsTest.java | 8 +-
.../apache/uniffle/server/ShuffleServerTest.java | 6 +-
17 files changed, 602 insertions(+), 63 deletions(-)
diff --git a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
index e449215a..2546cfa6 100644
--- a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
@@ -55,7 +55,7 @@ public class ServerStatusTest {
fail(e.getMessage());
}
}
- for (int i = 0; i < serverStatuses.size() - 1; i++) {
+ for (int i = 0; i < serverStatuses.size(); i++) {
assertEquals(protoServerStatuses.get(i), serverStatuses.get(i).toProto());
assertEquals(ServerStatus.fromProto(protoServerStatuses.get(i)), serverStatuses.get(i));
}
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
index 1233f9b5..ea623ea5 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.metrics;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -28,18 +29,38 @@ public class TestUtils {
private TestUtils() {
}
- public static String httpGetMetrics(String urlString) throws IOException {
+ public static String httpGet(String urlString) throws IOException {
URL url = new URL(urlString);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
- BufferedReader in = new BufferedReader(
- new InputStreamReader(con.getInputStream()));
- String inputLine;
- StringBuffer content = new StringBuffer();
- while ((inputLine = in.readLine()) != null) {
- content.append(inputLine);
+ StringBuilder content = new StringBuilder();
+ try (BufferedReader in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));) {
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ content.append(inputLine);
+ }
}
- in.close();
+ return content.toString();
+ }
+
+ public static String httpPost(String urlString, String postData) throws IOException {
+ URL url = new URL(urlString);
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+ con.setDoOutput(true);
+ con.setRequestMethod("POST");
+ StringBuilder content = new StringBuilder();
+ try (OutputStream outputStream = con.getOutputStream();) {
+ outputStream.write(postData.getBytes());
+ try (BufferedReader in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));) {
+ String inputLine;
+ while ((inputLine = in.readLine()) != null) {
+ content.append(inputLine);
+ }
+ }
+ }
+
return content.toString();
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
index 5956c1d3..4e9c5b2f 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
@@ -55,7 +55,7 @@ public class StatusCodeTest {
fail(e.getMessage());
}
}
- for (int i = 0; i < statusCodes.size() - 1; i++) {
+ for (int i = 0; i < statusCodes.size(); i++) {
assertEquals(protoStatusCode.get(i), statusCodes.get(i).toProto());
assertEquals(StatusCode.fromProto(protoStatusCode.get(i)), statusCodes.get(i));
}
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index ac6b2ec5..df147d3f 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -107,6 +107,7 @@
<includes>
<include>com.google.protobuf:protobuf-java-util</include>
<include>com.google.guava:guava</include>
+ <include>com.google.guava:failureaccess</include>
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-core</include>
</includes>
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 0ac8f246..686f0596 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -41,6 +41,9 @@ import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
+import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet;
+import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet;
+import org.apache.uniffle.coordinator.web.servlet.NodesServlet;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -150,6 +153,7 @@ public class CoordinatorServer extends ReconfigurableBase {
id = ip + "-" + port;
LOG.info("Start to initialize coordinator {}", id);
jettyServer = new JettyServer(coordinatorConf);
+ registerRESTAPI();
// register metrics first to avoid NPE problem when add dynamic metrics
registerMetrics();
coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
@@ -182,6 +186,19 @@ public class CoordinatorServer extends ReconfigurableBase {
server = coordinatorFactory.getServer();
}
+ private void registerRESTAPI() throws Exception {
+ LOG.info("Register REST API");
+ jettyServer.addServlet(
+ new NodesServlet(this),
+ "/api/server/nodes");
+ jettyServer.addServlet(
+ new DecommissionServlet(this),
+ "/api/server/decommission");
+ jettyServer.addServlet(
+ new CancelDecommissionServlet(this),
+ "/api/server/cancelDecommission");
+ }
+
private void registerMetrics() throws Exception {
LOG.info("Register metrics");
CollectorRegistry coordinatorCollectorRegistry = new CollectorRegistry(true);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java
new file mode 100644
index 00000000..b883d55f
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web;
+
+public class Response<T> {
+ private static final int SUCCESS_CODE = 0;
+ private static final int ERROR_CODE = -1;
+ private int code;
+ private T data;
+ private String errMsg;
+
+ public Response() {
+ }
+
+ public Response(int code, T data, String errMsg) {
+ this.code = code;
+ this.data = data;
+ this.errMsg = errMsg;
+ }
+
+ public static <T> Response<T> success(T data) {
+ return new Response<>(SUCCESS_CODE, data, null);
+ }
+
+ public static <T> Response<T> fail(String msg) {
+ return new Response<>(ERROR_CODE, null, msg);
+ }
+
+ public static <T> Response<T> fail(String msg, int code) {
+ return new Response<>(code, null, msg);
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ public void setData(T data) {
+ this.data = data;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ public void setErrMsg(String errMsg) {
+ this.errMsg = errMsg;
+ }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
copy to coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
index 1233f9b5..997a135e 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java
@@ -15,31 +15,18 @@
* limitations under the License.
*/
-package org.apache.uniffle.common.metrics;
+package org.apache.uniffle.coordinator.web.request;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.util.Set;
-public class TestUtils {
+public class CancelDecommissionRequest {
+ private Set<String> serverIds;
- private TestUtils() {
+ public Set<String> getServerIds() {
+ return serverIds;
}
- public static String httpGetMetrics(String urlString) throws IOException {
- URL url = new URL(urlString);
- HttpURLConnection con = (HttpURLConnection) url.openConnection();
- con.setRequestMethod("GET");
- BufferedReader in = new BufferedReader(
- new InputStreamReader(con.getInputStream()));
- String inputLine;
- StringBuffer content = new StringBuffer();
- while ((inputLine = in.readLine()) != null) {
- content.append(inputLine);
- }
- in.close();
- return content.toString();
+ public void setServerIds(Set<String> serverIds) {
+ this.serverIds = serverIds;
}
}
diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
copy to coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
index 1233f9b5..c11a716c 100644
--- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java
@@ -15,31 +15,18 @@
* limitations under the License.
*/
-package org.apache.uniffle.common.metrics;
+package org.apache.uniffle.coordinator.web.request;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.util.Set;
-public class TestUtils {
+public class DecommissionRequest {
+ private Set<String> serverIds;
- private TestUtils() {
+ public Set<String> getServerIds() {
+ return serverIds;
}
- public static String httpGetMetrics(String urlString) throws IOException {
- URL url = new URL(urlString);
- HttpURLConnection con = (HttpURLConnection) url.openConnection();
- con.setRequestMethod("GET");
- BufferedReader in = new BufferedReader(
- new InputStreamReader(con.getInputStream()));
- String inputLine;
- StringBuffer content = new StringBuffer();
- while ((inputLine = in.readLine()) != null) {
- content.append(inputLine);
- }
- in.close();
- return content.toString();
+ public void setServerIds(Set<String> serverIds) {
+ this.serverIds = serverIds;
}
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
new file mode 100644
index 00000000..a67701f2
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.uniffle.coordinator.web.Response;
+
+public abstract class BaseServlet extends HttpServlet {
+ public static final String JSON_MIME_TYPE = "application/json";
+ final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ writeJSON(resp, handlerRequest(() -> handleGet(req, resp)));
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ writeJSON(resp, handlerRequest(() -> handlePost(req, resp)));
+ }
+
+ private Response handlerRequest(
+ Callable<Response> function) {
+ Response response;
+ try {
+ // todo: Do something for authentication
+ response = function.call();
+ } catch (Exception e) {
+ response = Response.fail(e.getMessage());
+ }
+ return response;
+ }
+
+ protected Response handleGet(
+ HttpServletRequest req,
+ HttpServletResponse resp) throws ServletException, IOException {
+ throw new IOException("Method not support!");
+ }
+
+ protected Response handlePost(
+ HttpServletRequest req,
+ HttpServletResponse resp) throws ServletException, IOException {
+ throw new IOException("Method not support!");
+ }
+
+ protected void writeJSON(final HttpServletResponse resp, final Object obj)
+ throws IOException {
+ if (obj == null) {
+ return;
+ }
+ resp.setContentType(JSON_MIME_TYPE);
+ final OutputStream stream = resp.getOutputStream();
+ mapper.writeValue(stream, obj);
+ }
+
+ protected <T> T parseParamsFromJson(HttpServletRequest req, Class<T> clazz) throws IOException {
+ return mapper.readValue(req.getInputStream(), clazz);
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
new file mode 100644
index 00000000..b7411d4e
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
+
+public class CancelDecommissionServlet extends BaseServlet {
+ private final CoordinatorServer coordinator;
+
+ public CancelDecommissionServlet(CoordinatorServer coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ CancelDecommissionRequest params = parseParamsFromJson(req, CancelDecommissionRequest.class);
+ if (CollectionUtils.isEmpty(params.getServerIds())) {
+ return Response.fail("Parameter[serverIds] should not be null!");
+ }
+ ClusterManager clusterManager = coordinator.getClusterManager();
+ params.getServerIds().forEach((serverId) -> {
+ clusterManager.cancelDecommission(serverId);
+ });
+ return Response.success(null);
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
new file mode 100644
index 00000000..96f06dd3
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
+
+public class DecommissionServlet extends BaseServlet {
+ private final CoordinatorServer coordinator;
+
+ public DecommissionServlet(CoordinatorServer coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ DecommissionRequest params = parseParamsFromJson(req, DecommissionRequest.class);
+ if (CollectionUtils.isEmpty(params.getServerIds())) {
+ return Response.fail("Parameter[serverIds] should not be null!");
+ }
+ ClusterManager clusterManager = coordinator.getClusterManager();
+ params.getServerIds().forEach((serverId) -> {
+ clusterManager.decommission(serverId);
+ });
+ return Response.success(null);
+ }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
new file mode 100644
index 00000000..04850235
--- /dev/null
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.coordinator.web.servlet;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.web.Response;
+
+
+public class NodesServlet extends BaseServlet {
+ private final CoordinatorServer coordinator;
+
+ public NodesServlet(CoordinatorServer coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) {
+ List<ServerNode> serverList = coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+ String id = req.getParameter("id");
+ String status = req.getParameter("status");
+ serverList = serverList.stream().filter((server) -> {
+ if (id != null && !id.equals(server.getId())) {
+ return false;
+ }
+ if (status != null && !server.getStatus().toString().equals(status)) {
+ return false;
+ }
+ return true;
+ }).collect(Collectors.toList());
+ Collections.sort(serverList, Comparator.comparing(ServerNode::getId));
+ return Response.success(serverList);
+ }
+}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index 945cafda..454e6bb8 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -68,7 +68,7 @@ public class CoordinatorMetricsTest {
@Test
public void testDynamicMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+ String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode metricsNode = mapper.readTree(content).get("metrics");
String remoteStorageMetricsName = CoordinatorMetrics.REMOTE_STORAGE_IN_USED_PREFIX + "path1";
@@ -85,7 +85,7 @@ public class CoordinatorMetricsTest {
@Test
public void testCoordinatorMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+ String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
@@ -101,7 +101,7 @@ public class CoordinatorMetricsTest {
@Test
public void testJvmMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_JVM_URL);
+ String content = TestUtils.httpGet(SERVER_JVM_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
@@ -109,7 +109,7 @@ public class CoordinatorMetricsTest {
@Test
public void testGrpcMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL);
+ String content = TestUtils.httpGet(SERVER_GRPC_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index 09593f55..d3815a04 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -136,3 +136,61 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. |
|rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval in seconds for the reporter to report metrics. |
+
+## RESTful API(beta)
+
+### Fetch Shuffle servers
+
+<details>
+ <summary><code>GET</code> <code><b>/api/server/nodes</b></code> </summary>
+
+##### Parameters
+
+> |name|type|data type|description|
+> |----|----|---------|-----------|
+> |id|required|string|shuffle server id, eg:127.0.0.1:19999|
+> |status|optional|string|Shuffle server status, eg:ACTIVE, DECOMMISSIONING, DECOMMISSIONED|
+
+##### Example cURL
+
+> ```bash
+> curl -X GET http://localhost:19998/api/server/nodes
+> ```
+</details>
+
+### Decommission shuffle servers
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/decommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type |description|
+> |----|-------------------|---------|-----------|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+>
+##### Example cURL
+
+> ```bash
+> curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/decommission -d '{"serverIds:": ["127.0.0.1:19999"]}'
+> ```
+</details>
+
+
+### Cancel decommission shuffle servers
+
+<details>
+ <summary><code>POST</code> <code><b>/api/server/cancelDecommission</b></code> </summary>
+
+##### Parameters
+
+> |name|type| data type |description|
+> |----|-------------------|---------|-----------|
+> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]|
+>
+##### Example cURL
+
+> ```bash
+> curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/cancelDecommission -d '{"serverIds:": ["127.0.0.1:19999"]}'
+> ```
+</details>
\ No newline at end of file
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
new file mode 100644
index 00000000..1963195e
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.web.Response;
+import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
+import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ServletTest extends IntegrationTestBase {
+ private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";
+ private static final String NODES_URL = URL_PREFIX + "server/nodes";
+ private static final String DECOMMISSION_URL = URL_PREFIX + "server/decommission";
+ private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + "server/cancelDecommission";
+ private static CoordinatorServer coordinatorServer;
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @BeforeAll
+ public static void setUp(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, 12345);
+ coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128);
+ coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346);
+ createCoordinatorServer(coordinatorConf);
+
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM, "127.0.0.1:12346");
+ File dataDir1 = new File(tmpDir, "data1");
+ File dataDir2 = new File(tmpDir, "data2");
+ List<String> basePath = Lists.newArrayList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath());
+ shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+ createShuffleServer(shuffleServerConf);
+ File dataDir3 = new File(tmpDir, "data3");
+ File dataDir4 = new File(tmpDir, "data4");
+ basePath = Lists.newArrayList(dataDir3.getAbsolutePath(), dataDir4.getAbsolutePath());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 1);
+ shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18081);
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ coordinatorServer = coordinators.get(0);
+ Awaitility.await().timeout(30, TimeUnit.SECONDS).until(() ->
+ coordinatorServer.getClusterManager().list().size() == 2);
+ }
+
+ @Test
+ public void testNodesServlet() throws Exception {
+ String content = TestUtils.httpGet(NODES_URL);
+ Response<List<HashMap>> response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
+ });
+ List<HashMap> serverList = response.getData();
+ assertEquals(0, response.getCode());
+ assertEquals(2, serverList.size());
+ assertEquals(SHUFFLE_SERVER_PORT, Integer.parseInt(serverList.get(0).get("port").toString()));
+ assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(0).get("status"));
+ assertEquals(SHUFFLE_SERVER_PORT + 1, Integer.parseInt(serverList.get(1).get("port").toString()));
+ assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(1).get("status"));
+
+ // Only fetch one server.
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId());
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {
+ });
+ serverList = response.getData();
+ assertEquals(1, serverList.size());
+ assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
+
+ content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED");
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+ serverList = response.getData();
+ assertEquals(0, serverList.size());
+ content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE");
+ response = objectMapper.readValue(content, new TypeReference<Response<List<HashMap>>>() {});
+ serverList = response.getData();
+ assertEquals(2, serverList.size());
+ }
+
+ @Test
+ public void testDecommissionServlet() throws Exception {
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ DecommissionRequest decommissionRequest = new DecommissionRequest();
+ decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
+ String content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest));
+ Response response = objectMapper.readValue(content, Response.class);
+ assertEquals(-1, response.getCode());
+ assertNotNull(response.getErrMsg());
+ CancelDecommissionRequest cancelDecommissionRequest = new CancelDecommissionRequest();
+ cancelDecommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
+ content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+
+ // Register shuffle, avoid server exiting immediately.
+ ShuffleServerGrpcClient shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+ shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("testDecommissionServlet_appId", 0,
+ Lists.newArrayList(new PartitionRange(0, 1)), ""));
+ decommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
+ content = TestUtils.httpPost(DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+ assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus());
+
+ // Wait until shuffle server send heartbeat to coordinator.
+ Awaitility.await().timeout(10, TimeUnit.SECONDS).until(() ->
+ ServerStatus.DECOMMISSIONING.equals(
+ coordinatorServer.getClusterManager().getServerNodeById(shuffleServer.getId()).getStatus()));
+ // Cancel decommission.
+ content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest));
+ response = objectMapper.readValue(content, Response.class);
+ assertEquals(0, response.getCode());
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+ }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index b995a90f..99993eb4 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -77,7 +77,7 @@ public class ShuffleServerMetricsTest {
@Test
public void testJvmMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_JVM_URL);
+ String content = TestUtils.httpGet(SERVER_JVM_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
@@ -85,7 +85,7 @@ public class ShuffleServerMetricsTest {
@Test
public void testServerMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+ String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
@@ -145,7 +145,7 @@ public class ShuffleServerMetricsTest {
@Test
public void testGrpcMetrics() throws Exception {
- String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL);
+ String content = TestUtils.httpGet(SERVER_GRPC_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
@@ -187,7 +187,7 @@ public class ShuffleServerMetricsTest {
f.get();
}
- String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL);
+ String content = TestUtils.httpGet(SERVER_METRICS_URL);
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index 19c9513e..6dc5e56e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -50,7 +50,6 @@ public class ShuffleServerTest {
ShuffleServerConf serverConf = createShuffleServerConf();
ShuffleServer ss1 = new ShuffleServer(serverConf);
ss1.start();
- ss1.stopServer();
ExitUtils.disableSystemExit();
ShuffleServer ss2 = new ShuffleServer(serverConf);
String expectMessage = "Fail to start jetty http server";
@@ -61,7 +60,6 @@ public class ShuffleServerTest {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
}
- ss2.stopServer();
serverConf.setInteger("rss.jetty.http.port", 9529);
ss2 = new ShuffleServer(serverConf);
@@ -72,7 +70,7 @@ public class ShuffleServerTest {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
}
- ss2.stopServer();
+ ss1.stopServer();
final Thread t = new Thread(null, () -> {
throw new AssertionError("TestUncaughtException");
@@ -92,6 +90,8 @@ public class ShuffleServerTest {
ShuffleServerConf serverConf = createShuffleServerConf();
serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L);
serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown);
+ serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19527);
+ serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19528);
ShuffleServer shuffleServer = new ShuffleServer(serverConf);
shuffleServer.start();
assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());