You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/12/02 23:53:20 UTC

[helix] branch master updated: Add rest API for take/free instance (#1917)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new c07c873  Add rest API for take/free instance (#1917)
c07c873 is described below

commit c07c8734a1b8970eb3d3f3fdca6295b0cfafc80d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Dec 2 15:53:12 2021 -0800

    Add rest API for take/free instance (#1917)
    
    * add rest API for take/free instance
---
 .../MaintenanceManagementInstanceInfo.java         |   2 +
 .../MaintenanceManagementService.java              |  22 +++-
 ...nAbstractClass.java => OperationInterface.java} |   2 +-
 .../resources/helix/PerInstanceAccessor.java       | 142 ++++++++++++++++++++-
 .../helix/rest/server/TestPerInstanceAccessor.java |  18 ++-
 5 files changed, 175 insertions(+), 11 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
index d50ad0d..8914173 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
@@ -37,11 +37,13 @@ public class MaintenanceManagementInstanceInfo {
   public MaintenanceManagementInstanceInfo(OperationalStatus status) {
     this.status = status;
     this.messages = new ArrayList<>();
+    this.operationResult = "";
   }
 
   public MaintenanceManagementInstanceInfo(OperationalStatus status, List<String> messages) {
     this.status = status;
     this.messages = messages;
+    this.operationResult = "";
   }
 
   public MaintenanceManagementInstanceInfo(OperationalStatus status, String newOperationResult) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 605e8f6..7785f9d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -19,7 +19,6 @@ package org.apache.helix.rest.clusterMaintenanceService;
  * under the License.
  */
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +36,7 @@ import java.util.stream.Collectors;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.SharedMetricRegistries;
 import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -55,7 +55,6 @@ import org.apache.helix.rest.common.HelixDataAccessorWrapper;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 import org.apache.helix.rest.server.service.InstanceService;
-import org.apache.helix.rest.server.service.InstanceServiceImpl;
 import org.apache.helix.util.InstanceValidationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ import org.slf4j.LoggerFactory;
 
 public class MaintenanceManagementService {
 
-  private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
+  private static final Logger LOG = LoggerFactory.getLogger(MaintenanceManagementService.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final ExecutorService POOL = Executors.newCachedThreadPool();
 
@@ -275,7 +274,7 @@ public class MaintenanceManagementService {
         batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
     // custom check
     batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
-        finalStoppableChecks, getCustomPayLoads(jsonContent));
+        finalStoppableChecks, getMapFromJsonPayload(jsonContent));
     return finalStoppableChecks;
   }
 
@@ -412,7 +411,7 @@ public class MaintenanceManagementService {
     return instanceStoppableChecks;
   }
 
-  private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException {
+  public static Map<String, String> getMapFromJsonPayload(String jsonContent) throws IOException {
     Map<String, String> result = new HashMap<>();
     if (jsonContent == null) {
       return result;
@@ -423,6 +422,19 @@ public class MaintenanceManagementService {
     return result;
   }
 
+  public static List<String> getListFromJsonPayload(JsonNode jsonContent)
+      throws IllegalArgumentException {
+    return (jsonContent == null) ? Collections.emptyList()
+        : OBJECT_MAPPER.convertValue(jsonContent, List.class);
+  }
+
+  public static Map<String, String> getMapFromJsonPayload(JsonNode jsonContent)
+      throws IllegalArgumentException {
+    return jsonContent == null ? new HashMap<>()
+        : OBJECT_MAPPER.convertValue(jsonContent, new TypeReference<Map<String, String>>() {
+        });
+  }
+
   @VisibleForTesting
   protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
       List<HealthCheck> healthChecks) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
similarity index 98%
rename from helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java
rename to helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
index e77919b..2062ce9 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationAbstractClass.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/api/OperationInterface.java
@@ -26,7 +26,7 @@ import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementInst
 import org.apache.helix.rest.common.datamodel.RestSnapShot;
 
 
-interface OperationAbstractClass {
+public interface OperationInterface {
   // operation check
   MaintenanceManagementInstanceInfo operationCheckForTakeSingleInstance(String instanceName,
       Map<String, String> operationConfig, RestSnapShot sn);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index d42e8d9..92cd045 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -82,7 +82,22 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     total_message_count,
     read_message_count,
     healthreports,
-    instanceTags
+    instanceTags,
+    health_check_list,
+    health_check_config,
+    operation_list,
+    operation_config,
+    continueOnFailures,
+    skipZKRead
+    }
+
+  private static class MaintenanceOpInputFields {
+    List<String> healthChecks = null;
+    Map<String, String> healthCheckConfig = null;
+    List<String> operations = null;
+    Map<String, String> operationConfig = null;
+    boolean continueOnFailures = false;
+    boolean skipZKRead = false;
   }
 
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
@@ -185,6 +200,131 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     return OK(OBJECT_MAPPER.writeValueAsString(stoppableCheck));
   }
 
+  /**
+   * Performs health checks, user designed operation check and execution for take an instance.
+   *
+   * @param jsonContent json payload
+   * @param clusterId cluster id
+   * @param instanceName Instance name to be checked
+   * @return json response representing if queried instance is stoppable
+   * @throws IOException if there is any IO/network error
+   */
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  @Path("takeInstance")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response takeSingleInstance(
+      String jsonContent,
+      @PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName){
+
+    try {
+      MaintenanceOpInputFields inputFields = readMaintenanceInputFromJson(jsonContent);
+      if (inputFields == null) {
+        return badRequest("Invalid input for content : " + jsonContent);
+      }
+
+      MaintenanceManagementService maintenanceManagementService =
+          new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
+              getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+              getNamespace());
+
+      return JSONRepresentation(maintenanceManagementService
+          .takeInstance(clusterId, instanceName, inputFields.healthChecks,
+              inputFields.healthCheckConfig,
+              inputFields.operations,
+              inputFields.operationConfig, true));
+    } catch (Exception e) {
+      LOG.error("Failed to takeInstances:", e);
+      return badRequest("Failed to takeInstances: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Performs health checks, user designed operation check and execution for free an instance.
+   *
+   * @param jsonContent json payload
+   * @param clusterId cluster id
+   * @param instanceName Instance name to be checked
+   * @return json response representing if queried instance is stoppable
+   * @throws IOException if there is any IO/network error
+   */
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  @Path("freeInstance")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response freeSingleInstance(
+      String jsonContent,
+      @PathParam("clusterId") String clusterId,
+      @PathParam("instanceName") String instanceName){
+
+    try {
+      MaintenanceOpInputFields inputFields = readMaintenanceInputFromJson(jsonContent);
+      if (inputFields == null) {
+        return badRequest("Invalid input for content : " + jsonContent);
+      }
+      if (inputFields.healthChecks.size() != 0) {
+        LOG.warn("freeSingleInstance won't perform user passed health check.");
+      }
+
+      MaintenanceManagementService maintenanceManagementService =
+          new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
+              getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+              getNamespace());
+
+      return JSONRepresentation(maintenanceManagementService
+          .freeInstance(clusterId, instanceName, inputFields.healthChecks,
+              inputFields.healthCheckConfig,
+              inputFields.operations,
+              inputFields.operationConfig, true));
+    } catch (Exception e) {
+      LOG.error("Failed to takeInstances:", e);
+      return badRequest("Failed to takeInstances: " + e.getMessage());
+    }
+  }
+
+  private MaintenanceOpInputFields readMaintenanceInputFromJson(String jsonContent) throws IOException {
+    JsonNode node = null;
+    if (jsonContent.length() != 0) {
+      node = OBJECT_MAPPER.readTree(jsonContent);
+    }
+    if (node == null) {
+      return null;
+    }
+    MaintenanceOpInputFields inputFields = new MaintenanceOpInputFields();
+    String continueOnFailuresName = PerInstanceProperties.continueOnFailures.name();
+    String skipZKReadName = PerInstanceProperties.skipZKRead.name();
+
+    inputFields.continueOnFailures =
+        inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
+            .containsKey(continueOnFailuresName) && Boolean
+            .parseBoolean(inputFields.healthCheckConfig.get(continueOnFailuresName));
+    inputFields.skipZKRead = inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
+        .containsKey(skipZKReadName) && Boolean
+        .parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+
+    inputFields.healthChecks = MaintenanceManagementService
+        .getListFromJsonPayload(node.get(PerInstanceProperties.health_check_list.name()));
+    inputFields.healthCheckConfig = MaintenanceManagementService
+        .getMapFromJsonPayload(node.get(PerInstanceProperties.health_check_config.name()));
+    if (inputFields.healthCheckConfig != null || !inputFields.healthChecks.isEmpty()) {
+      inputFields.healthCheckConfig.remove(continueOnFailuresName);
+      inputFields.healthCheckConfig.remove(skipZKReadName);
+    }
+
+    inputFields.operations = MaintenanceManagementService
+        .getListFromJsonPayload(node.get(PerInstanceProperties.operation_list.name()));
+    inputFields.operationConfig = MaintenanceManagementService
+        .getMapFromJsonPayload(node.get(PerInstanceProperties.operation_config.name()));
+
+    LOG.debug("Input fields for take/free Instance" + inputFields.toString());
+
+    return inputFields;
+  }
+
+
   @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
   @Timed(name = HttpConstants.WRITE_REQUEST)
   @PUT
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 6f9321a..58bccbf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -66,8 +66,9 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
     String stoppableCheckResult = response.readEntity(String.class);
 
     Map<String, Object> actualMap = OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
-    List<String> failedChecks = Arrays.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT",
-        "HELIX:INSTANCE_NOT_ENABLED", "HELIX:INSTANCE_NOT_STABLE");
+    List<String> failedChecks = Arrays
+        .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+            "HELIX:INSTANCE_NOT_STABLE");
     Map<String, Object> expectedMap =
         ImmutableMap.of("stoppable", false, "failedChecks", failedChecks);
     Assert.assertEquals(actualMap, expectedMap);
@@ -75,6 +76,16 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testIsInstanceStoppable")
+  public void testTakeInstanceNegInput() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    post("clusters/TestCluster_0/instances/instance1/takeInstance", null,
+        Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.BAD_REQUEST.getStatusCode(), true);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceNegInput")
   public void testGetAllMessages() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String testInstance = CLUSTER_NAME + "localhost_12926"; //Non-live instance
@@ -91,8 +102,7 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
     HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
     helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(testInstance, messageId), message);
 
-    String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}/messages")
-        .isBodyReturnExpected(true).format(CLUSTER_NAME, testInstance).get(this);
+    String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}/messages").isBodyReturnExpected(true).format(CLUSTER_NAME, testInstance).get(this);
     JsonNode node = OBJECT_MAPPER.readTree(body);
     int newMessageCount =
         node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).intValue();