You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:15:11 UTC

helix git commit: [HELIX-684] Add health status API in ResourceAccessor

Repository: helix
Updated Branches:
  refs/heads/master 24f2610f6 -> f4942c492


[HELIX-684] Add health status API in ResourceAccessor

The getResourceHealth method returns the status of all resources in a given cluster. The getPartitionHealth method returns the status of all partitions of a given resource.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f4942c49
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f4942c49
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f4942c49

Branch: refs/heads/master
Commit: f4942c492fd8ef0ca63631521b2f9e31bc510f64
Parents: 24f2610
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:12:11 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:14:22 2018 -0700

----------------------------------------------------------------------
 .../resources/helix/ResourceAccessor.java       | 155 +++++++++++++++--
 .../helix/rest/server/TestResourceAccessor.java | 167 ++++++++++++++++++-
 2 files changed, 303 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f4942c49/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index 04d3536..968122f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -20,9 +20,13 @@ package org.apache.helix.rest.server.resources.helix;
  */
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -41,12 +45,14 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.JsonNodeFactory;
 import org.codehaus.jackson.node.ObjectNode;
 
+
 @Path("/clusters/{clusterId}/resources")
 public class ResourceAccessor extends AbstractHelixResource {
   private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
@@ -58,6 +64,12 @@ public class ResourceAccessor extends AbstractHelixResource {
     resourceConfig,
   }
 
+  public enum HealthStatus {
+    HEALTHY,
+    PARTIAL_HEALTHY,
+    UNHEALTHY
+  }
+
   @GET
   public Response getResources(@PathParam("clusterId") String clusterId) {
     ObjectNode root = JsonNodeFactory.instance.objectNode();
@@ -84,10 +96,66 @@ public class ResourceAccessor extends AbstractHelixResource {
     return JSONRepresentation(root);
   }
 
+  /**
+   * Returns health profile of all resources in the cluster
+   *
+   * @param clusterId
+   * @return JSON result
+   */
+  @GET
+  @Path("health")
+  public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
+
+    ZkClient zkClient = getZkClient();
+
+    List<String> resourcesInIdealState = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
+    List<String> resourcesInExternalView = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
+
+    Map<String, String> resourceHealthResult = new HashMap<>();
+
+    for (String resourceName : resourcesInIdealState) {
+      if(resourcesInExternalView.contains(resourceName)) {
+        Map<String, String> partitionHealth = computePartitionHealth(clusterId, resourceName);
+
+        if (partitionHealth.isEmpty() || partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) {
+          // No partitions for a resource or there exists one or more UNHEALTHY partitions in this resource, UNHEALTHY
+          resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
+        } else if (partitionHealth.values().contains(HealthStatus.PARTIAL_HEALTHY.name())) {
+          // No UNHEALTHY partition, but one or more partially healthy partitions, resource is partially healthy
+          resourceHealthResult.put(resourceName, HealthStatus.PARTIAL_HEALTHY.name());
+        } else {
+          // No UNHEALTHY or partially healthy partitions and non-empty, resource is healthy
+          resourceHealthResult.put(resourceName, HealthStatus.HEALTHY.name());
+        }
+      } else {
+        // If a resource is not in ExternalView, then it is UNHEALTHY
+        resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
+      }
+    }
+
+    return JSONRepresentation(resourceHealthResult);
+  }
+
+  /**
+   * Returns health profile of all partitions for the corresponding resource in the cluster
+   *
+   * @param clusterId
+   * @param resourceName
+   * @return JSON result
+   * @throws IOException
+   */
+  @GET
+  @Path("{resourceName}/health")
+  public Response getPartitionHealth(@PathParam("clusterId") String clusterId,
+      @PathParam("resourceName") String resourceName) {
+
+    return JSONRepresentation(computePartitionHealth(clusterId, resourceName));
+  }
+
   @GET
   @Path("{resourceName}")
   public Response getResource(@PathParam("clusterId") String clusterId,
-      @PathParam("resourceName") String resourceName) throws IOException {
+      @PathParam("resourceName") String resourceName) {
     ConfigAccessor accessor = getConfigAccessor();
     HelixAdmin admin = getHelixAdmin();
 
@@ -172,22 +240,22 @@ public class ResourceAccessor extends AbstractHelixResource {
     HelixAdmin admin = getHelixAdmin();
     try {
       switch (cmd) {
-      case enable:
-        admin.enableResource(clusterId, resourceName, true);
-        break;
-      case disable:
-        admin.enableResource(clusterId, resourceName, false);
-        break;
-      case rebalance:
-        if (replicas == -1) {
-          return badRequest("Number of replicas is needed for rebalancing!");
-        }
-        keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
-        admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
-        break;
-      default:
-        _logger.error("Unsupported command :" + command);
-        return badRequest("Unsupported command :" + command);
+        case enable:
+          admin.enableResource(clusterId, resourceName, true);
+          break;
+        case disable:
+          admin.enableResource(clusterId, resourceName, false);
+          break;
+        case rebalance:
+          if (replicas == -1) {
+            return badRequest("Number of replicas is needed for rebalancing!");
+          }
+          keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
+          admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
+          break;
+        default:
+          _logger.error("Unsupported command :" + command);
+          return badRequest("Unsupported command :" + command);
       }
     } catch (Exception e) {
       _logger.error("Failed in updating resource : " + resourceName, e);
@@ -275,4 +343,55 @@ public class ResourceAccessor extends AbstractHelixResource {
     return notFound();
   }
 
-}
+  private Map<String, String> computePartitionHealth(String clusterId, String resourceName) {
+    HelixAdmin admin = getHelixAdmin();
+    IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+    ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+    StateModelDefinition stateModelDef = admin.getStateModelDef(clusterId, idealState.getStateModelDefRef());
+    String initialState = stateModelDef.getInitialState();
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim stateList to initialState and above
+    int minActiveReplicas = idealState.getMinActiveReplicas();
+
+    // Start the logic that determines the health status of each partition
+    Map<String, String> partitionHealthResult = new HashMap<>();
+    Set<String> allPartitionNames = idealState.getPartitionSet();
+    if (!allPartitionNames.isEmpty()) {
+      for (String partitionName : allPartitionNames) {
+        int replicaCount = idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size());
+        // Simplify expectedStateCountMap by assuming that all instances are available to reduce computation load on this REST endpoint
+        LinkedHashMap<String, Integer> expectedStateCountMap =
+            stateModelDef.getStateCountMap(replicaCount, replicaCount);
+        // Extract all states into Collections from ExternalView
+        Map<String, String> stateMapInExternalView = externalView.getStateMap(partitionName);
+        Collection<String> allReplicaStatesInExternalView =
+            (stateMapInExternalView != null && !stateMapInExternalView.isEmpty()) ?
+                stateMapInExternalView.values() : Collections.<String>emptyList();
+        int numActiveReplicasInExternalView = 0;
+        HealthStatus status = HealthStatus.HEALTHY;
+
+        // Go through all states that are "active" states (higher priority than InitialState)
+        for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList.size(); statePriorityIndex++) {
+          String currentState = statesPriorityList.get(statePriorityIndex);
+          int currentStateCountInIdealState = expectedStateCountMap.get(currentState);
+          int currentStateCountInExternalView = Collections.frequency(allReplicaStatesInExternalView, currentState);
+          numActiveReplicasInExternalView += currentStateCountInExternalView;
+          // Top state counts must match, if not, unhealthy
+          if (statePriorityIndex == 0 && currentStateCountInExternalView != currentStateCountInIdealState) {
+            status = HealthStatus.UNHEALTHY;
+            break;
+          } else if (currentStateCountInExternalView < currentStateCountInIdealState) {
+            // For non-top states, if count in ExternalView is less than count in IdealState, partially healthy
+            status = HealthStatus.PARTIAL_HEALTHY;
+          }
+        }
+        if (numActiveReplicasInExternalView < minActiveReplicas) {
+          // If this partition does not satisfy the number of minimum active replicas, unhealthy
+          status = HealthStatus.UNHEALTHY;
+        }
+        partitionHealthResult.put(partitionName, status.name());
+      }
+    }
+    return partitionHealthResult;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/f4942c49/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index dea4e06..e8888a2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -21,10 +21,20 @@ package org.apache.helix.rest.server;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -32,12 +42,14 @@ import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.FullAutoModeISBuilder;
 import org.apache.helix.rest.server.resources.helix.ResourceAccessor;
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.type.TypeReference;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestResourceAccessor extends AbstractTestClass {
   private final static String CLUSTER_NAME = "TestCluster_0";
   private final static String RESOURCE_NAME = CLUSTER_NAME + "_db_0";
+  private final static String ANY_INSTANCE = "ANY_LIVEINSTANCE";
 
   @Test
   public void testGetResources() throws IOException {
@@ -138,6 +150,159 @@ public class TestResourceAccessor extends AbstractTestClass {
         .getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME));
   }
 
-}
+  @Test
+  public void testPartitionHealth() throws Exception {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    String clusterName = "TestCluster_1";
+    String resourceName = clusterName + "_db_0";
+
+    // Use mock numbers for testing
+    Map<String, String> idealStateParams = new HashMap<>();
+    idealStateParams.put("MinActiveReplicas", "2");
+    idealStateParams.put("StateModelDefRef", "MasterSlave");
+    idealStateParams.put("MaxPartitionsPerInstance", "3");
+    idealStateParams.put("Replicas", "3");
+    idealStateParams.put("NumPartitions", "3");
+
+    // Create a mock state mapping for testing
+    Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
+    String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p1 = {"MASTER", "SLAVE", "ERROR"};
+    String[] p2 = {"ERROR", "SLAVE", "SLAVE"};
+    partitionReplicaStates.put("p0", Arrays.asList(p0));
+    partitionReplicaStates.put("p1", Arrays.asList(p1));
+    partitionReplicaStates.put("p2", Arrays.asList(p2));
+
+    createDummyMapping(clusterName, resourceName, idealStateParams, partitionReplicaStates);
+
+    // Get the result of getPartitionHealth
+    String body = get("clusters/" + clusterName + "/resources/" + resourceName + "/health",
+        Response.Status.OK.getStatusCode(), true);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+
+    Assert.assertEquals(healthStatus.get("p0"), "HEALTHY");
+    Assert.assertEquals(healthStatus.get("p1"), "PARTIAL_HEALTHY");
+    Assert.assertEquals(healthStatus.get("p2"), "UNHEALTHY");
+  }
+
+  @Test(dependsOnMethods = "testPartitionHealth")
+  public void testResourceHealth() throws Exception {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    String clusterName = "TestCluster_1";
+    Map<String, String> idealStateParams = new HashMap<>();
+    idealStateParams.put("MinActiveReplicas", "2");
+    idealStateParams.put("StateModelDefRef", "MasterSlave");
+    idealStateParams.put("MaxPartitionsPerInstance", "3");
+    idealStateParams.put("Replicas", "3");
+    idealStateParams.put("NumPartitions", "3");
+
+    // Create a healthy resource
+    String resourceNameHealthy = clusterName + "_db_0";
+    Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
+    String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p1 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p2 = {"MASTER", "SLAVE", "SLAVE"};
+    partitionReplicaStates.put("p0", Arrays.asList(p0));
+    partitionReplicaStates.put("p1", Arrays.asList(p1));
+    partitionReplicaStates.put("p2", Arrays.asList(p2));
+
+    createDummyMapping(clusterName, resourceNameHealthy, idealStateParams, partitionReplicaStates);
 
+    // Create a partially healthy resource
+    String resourceNamePartiallyHealthy = clusterName + "_db_1";
+    Map<String, List<String>> partitionReplicaStates_1 = new LinkedHashMap<>();
+    String[] p0_1 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p1_1 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p2_1 = {"MASTER", "SLAVE", "ERROR"};
+    partitionReplicaStates_1.put("p0", Arrays.asList(p0_1));
+    partitionReplicaStates_1.put("p1", Arrays.asList(p1_1));
+    partitionReplicaStates_1.put("p2", Arrays.asList(p2_1));
 
+    createDummyMapping(clusterName, resourceNamePartiallyHealthy, idealStateParams, partitionReplicaStates_1);
+
+    // Create a partially healthy resource
+    String resourceNameUnhealthy = clusterName + "_db_2";
+    Map<String, List<String>> partitionReplicaStates_2 = new LinkedHashMap<>();
+    String[] p0_2 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p1_2 = {"MASTER", "SLAVE", "SLAVE"};
+    String[] p2_2 = {"ERROR", "SLAVE", "ERROR"};
+    partitionReplicaStates_2.put("p0", Arrays.asList(p0_2));
+    partitionReplicaStates_2.put("p1", Arrays.asList(p1_2));
+    partitionReplicaStates_2.put("p2", Arrays.asList(p2_2));
+
+    createDummyMapping(clusterName, resourceNameUnhealthy, idealStateParams, partitionReplicaStates_2);
+
+    // Get the result of getResourceHealth
+    String body = get("clusters/" + clusterName + "/resources/health", Response.Status.OK.getStatusCode(),
+        true);
+
+    JsonNode node = OBJECT_MAPPER.readTree(body);
+    Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+
+    Assert.assertEquals(healthStatus.get(resourceNameHealthy), "HEALTHY");
+    Assert.assertEquals(healthStatus.get(resourceNamePartiallyHealthy), "PARTIAL_HEALTHY");
+    Assert.assertEquals(healthStatus.get(resourceNameUnhealthy), "UNHEALTHY");
+  }
+
+  /**
+   * Creates a setup where the health API can be tested.
+   *
+   * @param clusterName
+   * @param resourceName
+   * @param idealStateParams
+   * @param partitionReplicaStates maps partitionName to its replicas' states
+   * @throws Exception
+   */
+  private void createDummyMapping(String clusterName, String resourceName,
+      Map<String, String> idealStateParams,
+      Map<String, List<String>> partitionReplicaStates) throws Exception {
+    IdealState idealState = new IdealState(resourceName);
+    idealState.setMinActiveReplicas(Integer.parseInt(idealStateParams.get("MinActiveReplicas"))); // 2
+    idealState.setStateModelDefRef(idealStateParams.get("StateModelDefRef")); // MasterSlave
+    idealState.setMaxPartitionsPerInstance(Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance"))); // 3
+    idealState.setReplicas(idealStateParams.get("Replicas")); // 3
+    idealState.setNumPartitions(Integer.parseInt(idealStateParams.get("NumPartitions"))); // 3
+    idealState.enable(false);
+
+    Map<String, List<String>> partitionNames = new LinkedHashMap<>();
+    List<String> dummyPrefList = new ArrayList<>();
+
+    for (int i = 0; i < Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance")); i++) {
+      dummyPrefList.add(ANY_INSTANCE);
+      partitionNames.put("p" + i, dummyPrefList);
+    }
+    idealState.getRecord().getListFields().putAll(partitionNames);
+
+    if (!_gSetupTool.getClusterManagementTool().getClusters().contains(clusterName)) {
+      _gSetupTool.getClusterManagementTool().addCluster(clusterName);
+    }
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, resourceName, idealState);
+
+    // Set ExternalView's replica states for a given parameter map
+    ExternalView externalView = new ExternalView(resourceName);
+
+    Map<String, Map<String, String>> mappingCurrent = new LinkedHashMap<>();
+
+    List<String> partitionReplicaStatesList = new ArrayList<>(partitionReplicaStates.keySet());
+    for (int k = 0; k < partitionReplicaStatesList.size(); k++) {
+      Map<String, String> replicaStatesForPartition = new LinkedHashMap<>();
+      List<String> replicaStateList = partitionReplicaStates.get(partitionReplicaStatesList.get(k));
+      for (int i = 0; i < replicaStateList.size(); i++) {
+        replicaStatesForPartition.put("r" + i, replicaStateList.get(i));
+      }
+      mappingCurrent.put("p" + k, replicaStatesForPartition);
+    }
+
+    externalView.getRecord().getMapFields().putAll(mappingCurrent);
+
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(clusterName, "p1", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    helixManager.connect();
+    HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
+    helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().externalView(resourceName), externalView);
+  }
+}
\ No newline at end of file