You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/12/02 23:53:20 UTC
[helix] branch master updated: Add rest API for take/free instance (#1917)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new c07c873 Add rest API for take/free instance (#1917)
c07c873 is described below
commit c07c8734a1b8970eb3d3f3fdca6295b0cfafc80d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Dec 2 15:53:12 2021 -0800
Add rest API for take/free instance (#1917)
* add rest API for take/free instance
---
.../MaintenanceManagementInstanceInfo.java | 2 +
.../MaintenanceManagementService.java | 22 +++-
...nAbstractClass.java => OperationInterface.java} | 2 +-
.../resources/helix/PerInstanceAccessor.java | 142 ++++++++++++++++++++-
.../helix/rest/server/TestPerInstanceAccessor.java | 18 ++-
5 files changed, 175 insertions(+), 11 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
index d50ad0d..8914173 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
@@ -37,11 +37,13 @@ public class MaintenanceManagementInstanceInfo {
public MaintenanceManagementInstanceInfo(OperationalStatus status) {
this.status = status;
this.messages = new ArrayList<>();
+ this.operationResult = "";
}
public MaintenanceManagementInstanceInfo(OperationalStatus status, List<String> messages) {
this.status = status;
this.messages = messages;
+ this.operationResult = "";
}
public MaintenanceManagementInstanceInfo(OperationalStatus status, String newOperationResult) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 605e8f6..7785f9d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -19,7 +19,6 @@ package org.apache.helix.rest.clusterMaintenanceService;
* under the License.
*/
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,6 +36,7 @@ import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -55,7 +55,6 @@ import org.apache.helix.rest.common.HelixDataAccessorWrapper;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.service.InstanceService;
-import org.apache.helix.rest.server.service.InstanceServiceImpl;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ import org.slf4j.LoggerFactory;
public class MaintenanceManagementService {
- private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MaintenanceManagementService.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ExecutorService POOL = Executors.newCachedThreadPool();
@@ -275,7 +274,7 @@ public class MaintenanceManagementService {
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
// custom check
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
- finalStoppableChecks, getCustomPayLoads(jsonContent));
+ finalStoppableChecks, getMapFromJsonPayload(jsonContent));
return finalStoppableChecks;
}
@@ -412,7 +411,7 @@ public class MaintenanceManagementService {
return instanceStoppableChecks;
}
- private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException {
+ public static Map<String, String> getMapFromJsonPayload(String jsonContent) throws IOException {
Map<String, String> result = new HashMap<>();
if (jsonContent == null) {
return result;
@@ -423,6 +422,19 @@ public class MaintenanceManagementService {
return result;
}
+ public static List<String> getListFromJsonPayload(JsonNode jsonContent)
+ throws IllegalArgumentException {
+ return (jsonContent == null) ? Collections.emptyList()
+ : OBJECT_MAPPER.convertValue(jsonContent, List.class);
+ }
+
+ public static Map<String, String> getMapFromJsonPayload(JsonNode jsonContent)
+ throws IllegalArgumentException {
+ return jsonContent == null ? new HashMap<>()
+ : OBJECT_MAPPER.convertValue(jsonContent, new TypeReference<Map<String, String>>() {
+ });
+ }
+
@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
similarity index 98%
rename from helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java
rename to helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
index e77919b..2062ce9 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
@@ -26,7 +26,7 @@ import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementInst
import org.apache.helix.rest.common.datamodel.RestSnapShot;
-interface OperationAbstractClass {
+public interface OperationInterface {
// operation check
MaintenanceManagementInstanceInfo operationCheckForTakeSingleInstance(String instanceName,
Map<String, String> operationConfig, RestSnapShot sn);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index d42e8d9..92cd045 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -82,7 +82,22 @@ public class PerInstanceAccessor extends AbstractHelixResource {
total_message_count,
read_message_count,
healthreports,
- instanceTags
+ instanceTags,
+ health_check_list,
+ health_check_config,
+ operation_list,
+ operation_config,
+ continueOnFailures,
+ skipZKRead
+ }
+
+ private static class MaintenanceOpInputFields {
+ List<String> healthChecks = null;
+ Map<String, String> healthCheckConfig = null;
+ List<String> operations = null;
+ Map<String, String> operationConfig = null;
+ boolean continueOnFailures = false;
+ boolean skipZKRead = false;
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@@ -185,6 +200,131 @@ public class PerInstanceAccessor extends AbstractHelixResource {
return OK(OBJECT_MAPPER.writeValueAsString(stoppableCheck));
}
+ /**
+ * Performs health checks, user designed operation check and execution for take an instance.
+ *
+ * @param jsonContent json payload
+ * @param clusterId cluster id
+ * @param instanceName Instance name to be checked
+ * @return json response representing if queried instance is stoppable
+ * @throws IOException if there is any IO/network error
+ */
+ @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+ @Timed(name = HttpConstants.WRITE_REQUEST)
+ @POST
+ @Path("takeInstance")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response takeSingleInstance(
+ String jsonContent,
+ @PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName){
+
+ try {
+ MaintenanceOpInputFields inputFields = readMaintenanceInputFromJson(jsonContent);
+ if (inputFields == null) {
+ return badRequest("Invalid input for content : " + jsonContent);
+ }
+
+ MaintenanceManagementService maintenanceManagementService =
+ new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
+ getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+ getNamespace());
+
+ return JSONRepresentation(maintenanceManagementService
+ .takeInstance(clusterId, instanceName, inputFields.healthChecks,
+ inputFields.healthCheckConfig,
+ inputFields.operations,
+ inputFields.operationConfig, true));
+ } catch (Exception e) {
+ LOG.error("Failed to takeInstances:", e);
+ return badRequest("Failed to takeInstances: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Performs health checks, user designed operation check and execution for free an instance.
+ *
+ * @param jsonContent json payload
+ * @param clusterId cluster id
+ * @param instanceName Instance name to be checked
+ * @return json response representing if queried instance is stoppable
+ * @throws IOException if there is any IO/network error
+ */
+ @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+ @Timed(name = HttpConstants.WRITE_REQUEST)
+ @POST
+ @Path("freeInstance")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response freeSingleInstance(
+ String jsonContent,
+ @PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName){
+
+ try {
+ MaintenanceOpInputFields inputFields = readMaintenanceInputFromJson(jsonContent);
+ if (inputFields == null) {
+ return badRequest("Invalid input for content : " + jsonContent);
+ }
+ if (inputFields.healthChecks.size() != 0) {
+ LOG.warn("freeSingleInstance won't perform user passed health check.");
+ }
+
+ MaintenanceManagementService maintenanceManagementService =
+ new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
+ getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+ getNamespace());
+
+ return JSONRepresentation(maintenanceManagementService
+ .freeInstance(clusterId, instanceName, inputFields.healthChecks,
+ inputFields.healthCheckConfig,
+ inputFields.operations,
+ inputFields.operationConfig, true));
+ } catch (Exception e) {
+ LOG.error("Failed to takeInstances:", e);
+ return badRequest("Failed to takeInstances: " + e.getMessage());
+ }
+ }
+
+ private MaintenanceOpInputFields readMaintenanceInputFromJson(String jsonContent) throws IOException {
+ JsonNode node = null;
+ if (jsonContent.length() != 0) {
+ node = OBJECT_MAPPER.readTree(jsonContent);
+ }
+ if (node == null) {
+ return null;
+ }
+ MaintenanceOpInputFields inputFields = new MaintenanceOpInputFields();
+ String continueOnFailuresName = PerInstanceProperties.continueOnFailures.name();
+ String skipZKReadName = PerInstanceProperties.skipZKRead.name();
+
+ inputFields.continueOnFailures =
+ inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
+ .containsKey(continueOnFailuresName) && Boolean
+ .parseBoolean(inputFields.healthCheckConfig.get(continueOnFailuresName));
+ inputFields.skipZKRead = inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
+ .containsKey(skipZKReadName) && Boolean
+ .parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+
+ inputFields.healthChecks = MaintenanceManagementService
+ .getListFromJsonPayload(node.get(PerInstanceProperties.health_check_list.name()));
+ inputFields.healthCheckConfig = MaintenanceManagementService
+ .getMapFromJsonPayload(node.get(PerInstanceProperties.health_check_config.name()));
+ if (inputFields.healthCheckConfig != null || !inputFields.healthChecks.isEmpty()) {
+ inputFields.healthCheckConfig.remove(continueOnFailuresName);
+ inputFields.healthCheckConfig.remove(skipZKReadName);
+ }
+
+ inputFields.operations = MaintenanceManagementService
+ .getListFromJsonPayload(node.get(PerInstanceProperties.operation_list.name()));
+ inputFields.operationConfig = MaintenanceManagementService
+ .getMapFromJsonPayload(node.get(PerInstanceProperties.operation_config.name()));
+
+ LOG.debug("Input fields for take/free Instance" + inputFields.toString());
+
+ return inputFields;
+ }
+
+
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@Timed(name = HttpConstants.WRITE_REQUEST)
@PUT
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 6f9321a..58bccbf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -66,8 +66,9 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
String stoppableCheckResult = response.readEntity(String.class);
Map<String, Object> actualMap = OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
- List<String> failedChecks = Arrays.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
- "HELIX:INSTANCE_NOT_ENABLED", "HELIX:INSTANCE_NOT_STABLE");
+ List<String> failedChecks = Arrays
+ .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+ "HELIX:INSTANCE_NOT_STABLE");
Map<String, Object> expectedMap =
ImmutableMap.of("stoppable", false, "failedChecks", failedChecks);
Assert.assertEquals(actualMap, expectedMap);
@@ -75,6 +76,16 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
}
@Test(dependsOnMethods = "testIsInstanceStoppable")
+ public void testTakeInstanceNegInput() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ post("clusters/TestCluster_0/instances/instance1/takeInstance", null,
+ Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.BAD_REQUEST.getStatusCode(), true);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceNegInput")
public void testGetAllMessages() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String testInstance = CLUSTER_NAME + "localhost_12926"; //Non-live instance
@@ -91,8 +102,7 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(testInstance, messageId), message);
- String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}/messages")
- .isBodyReturnExpected(true).format(CLUSTER_NAME, testInstance).get(this);
+ String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}/messages").isBodyReturnExpected(true).format(CLUSTER_NAME, testInstance).get(this);
JsonNode node = OBJECT_MAPPER.readTree(body);
int newMessageCount =
node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).intValue();