You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/27 23:15:11 UTC
helix git commit: [HELIX-684] Add health status API in
ResourceAccessor
Repository: helix
Updated Branches:
refs/heads/master 24f2610f6 -> f4942c492
[HELIX-684] Add health status API in ResourceAccessor
The getResourceHealth method returns the status of all resources in a given cluster. The getPartitionHealth method returns the status of all partitions of a given resource.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f4942c49
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f4942c49
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f4942c49
Branch: refs/heads/master
Commit: f4942c492fd8ef0ca63631521b2f9e31bc510f64
Parents: 24f2610
Author: narendly <na...@gmail.com>
Authored: Mon Mar 26 14:12:11 2018 -0700
Committer: narendly <na...@gmail.com>
Committed: Tue Mar 27 16:14:22 2018 -0700
----------------------------------------------------------------------
.../resources/helix/ResourceAccessor.java | 155 +++++++++++++++--
.../helix/rest/server/TestResourceAccessor.java | 167 ++++++++++++++++++-
2 files changed, 303 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f4942c49/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index 04d3536..968122f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -20,9 +20,13 @@ package org.apache.helix.rest.server.resources.helix;
*/
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -41,12 +45,14 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
+
@Path("/clusters/{clusterId}/resources")
public class ResourceAccessor extends AbstractHelixResource {
private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
@@ -58,6 +64,12 @@ public class ResourceAccessor extends AbstractHelixResource {
resourceConfig,
}
+ public enum HealthStatus {
+ HEALTHY,
+ PARTIAL_HEALTHY,
+ UNHEALTHY
+ }
+
@GET
public Response getResources(@PathParam("clusterId") String clusterId) {
ObjectNode root = JsonNodeFactory.instance.objectNode();
@@ -84,10 +96,66 @@ public class ResourceAccessor extends AbstractHelixResource {
return JSONRepresentation(root);
}
+ /**
+ * Returns health profile of all resources in the cluster
+ *
+ * @param clusterId
+ * @return JSON result
+ */
+ @GET
+ @Path("health")
+ public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
+
+ ZkClient zkClient = getZkClient();
+
+ List<String> resourcesInIdealState = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
+ List<String> resourcesInExternalView = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
+
+ Map<String, String> resourceHealthResult = new HashMap<>();
+
+ for (String resourceName : resourcesInIdealState) {
+ if(resourcesInExternalView.contains(resourceName)) {
+ Map<String, String> partitionHealth = computePartitionHealth(clusterId, resourceName);
+
+ if (partitionHealth.isEmpty() || partitionHealth.values().contains(HealthStatus.UNHEALTHY.name())) {
+ // No partitions for a resource or there exists one or more UNHEALTHY partitions in this resource, UNHEALTHY
+ resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
+ } else if (partitionHealth.values().contains(HealthStatus.PARTIAL_HEALTHY.name())) {
+ // No UNHEALTHY partition, but one or more partially healthy partitions, resource is partially healthy
+ resourceHealthResult.put(resourceName, HealthStatus.PARTIAL_HEALTHY.name());
+ } else {
+ // No UNHEALTHY or partially healthy partitions and non-empty, resource is healthy
+ resourceHealthResult.put(resourceName, HealthStatus.HEALTHY.name());
+ }
+ } else {
+ // If a resource is not in ExternalView, then it is UNHEALTHY
+ resourceHealthResult.put(resourceName, HealthStatus.UNHEALTHY.name());
+ }
+ }
+
+ return JSONRepresentation(resourceHealthResult);
+ }
+
+ /**
+ * Returns health profile of all partitions for the corresponding resource in the cluster
+ *
+ * @param clusterId
+ * @param resourceName
+ * @return JSON result
+ * @throws IOException
+ */
+ @GET
+ @Path("{resourceName}/health")
+ public Response getPartitionHealth(@PathParam("clusterId") String clusterId,
+ @PathParam("resourceName") String resourceName) {
+
+ return JSONRepresentation(computePartitionHealth(clusterId, resourceName));
+ }
+
@GET
@Path("{resourceName}")
public Response getResource(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) throws IOException {
+ @PathParam("resourceName") String resourceName) {
ConfigAccessor accessor = getConfigAccessor();
HelixAdmin admin = getHelixAdmin();
@@ -172,22 +240,22 @@ public class ResourceAccessor extends AbstractHelixResource {
HelixAdmin admin = getHelixAdmin();
try {
switch (cmd) {
- case enable:
- admin.enableResource(clusterId, resourceName, true);
- break;
- case disable:
- admin.enableResource(clusterId, resourceName, false);
- break;
- case rebalance:
- if (replicas == -1) {
- return badRequest("Number of replicas is needed for rebalancing!");
- }
- keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
- admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
- break;
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
+ case enable:
+ admin.enableResource(clusterId, resourceName, true);
+ break;
+ case disable:
+ admin.enableResource(clusterId, resourceName, false);
+ break;
+ case rebalance:
+ if (replicas == -1) {
+ return badRequest("Number of replicas is needed for rebalancing!");
+ }
+ keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
+ admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
+ break;
+ default:
+ _logger.error("Unsupported command :" + command);
+ return badRequest("Unsupported command :" + command);
}
} catch (Exception e) {
_logger.error("Failed in updating resource : " + resourceName, e);
@@ -275,4 +343,55 @@ public class ResourceAccessor extends AbstractHelixResource {
return notFound();
}
-}
+ private Map<String, String> computePartitionHealth(String clusterId, String resourceName) {
+ HelixAdmin admin = getHelixAdmin();
+ IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
+ ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
+ StateModelDefinition stateModelDef = admin.getStateModelDef(clusterId, idealState.getStateModelDefRef());
+ String initialState = stateModelDef.getInitialState();
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+ statesPriorityList = statesPriorityList.subList(0, statesPriorityList.indexOf(initialState)); // Trim stateList to initialState and above
+ int minActiveReplicas = idealState.getMinActiveReplicas();
+
+ // Start the logic that determines the health status of each partition
+ Map<String, String> partitionHealthResult = new HashMap<>();
+ Set<String> allPartitionNames = idealState.getPartitionSet();
+ if (!allPartitionNames.isEmpty()) {
+ for (String partitionName : allPartitionNames) {
+ int replicaCount = idealState.getReplicaCount(idealState.getPreferenceList(partitionName).size());
+ // Simplify expectedStateCountMap by assuming that all instances are available to reduce computation load on this REST endpoint
+ LinkedHashMap<String, Integer> expectedStateCountMap =
+ stateModelDef.getStateCountMap(replicaCount, replicaCount);
+ // Extract all states into Collections from ExternalView
+ Map<String, String> stateMapInExternalView = externalView.getStateMap(partitionName);
+ Collection<String> allReplicaStatesInExternalView =
+ (stateMapInExternalView != null && !stateMapInExternalView.isEmpty()) ?
+ stateMapInExternalView.values() : Collections.<String>emptyList();
+ int numActiveReplicasInExternalView = 0;
+ HealthStatus status = HealthStatus.HEALTHY;
+
+ // Go through all states that are "active" states (higher priority than InitialState)
+ for (int statePriorityIndex = 0; statePriorityIndex < statesPriorityList.size(); statePriorityIndex++) {
+ String currentState = statesPriorityList.get(statePriorityIndex);
+ int currentStateCountInIdealState = expectedStateCountMap.get(currentState);
+ int currentStateCountInExternalView = Collections.frequency(allReplicaStatesInExternalView, currentState);
+ numActiveReplicasInExternalView += currentStateCountInExternalView;
+ // Top state counts must match, if not, unhealthy
+ if (statePriorityIndex == 0 && currentStateCountInExternalView != currentStateCountInIdealState) {
+ status = HealthStatus.UNHEALTHY;
+ break;
+ } else if (currentStateCountInExternalView < currentStateCountInIdealState) {
+ // For non-top states, if count in ExternalView is less than count in IdealState, partially healthy
+ status = HealthStatus.PARTIAL_HEALTHY;
+ }
+ }
+ if (numActiveReplicasInExternalView < minActiveReplicas) {
+ // If this partition does not satisfy the number of minimum active replicas, unhealthy
+ status = HealthStatus.UNHEALTHY;
+ }
+ partitionHealthResult.put(partitionName, status.name());
+ }
+ }
+ return partitionHealthResult;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/f4942c49/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
index dea4e06..e8888a2 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAccessor.java
@@ -21,10 +21,20 @@ package org.apache.helix.rest.server;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -32,12 +42,14 @@ import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.rest.server.resources.helix.ResourceAccessor;
import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.type.TypeReference;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestResourceAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_0";
private final static String RESOURCE_NAME = CLUSTER_NAME + "_db_0";
+ private final static String ANY_INSTANCE = "ANY_LIVEINSTANCE";
@Test
public void testGetResources() throws IOException {
@@ -138,6 +150,159 @@ public class TestResourceAccessor extends AbstractTestClass {
.getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME));
}
-}
+ @Test
+ public void testPartitionHealth() throws Exception {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String clusterName = "TestCluster_1";
+ String resourceName = clusterName + "_db_0";
+
+ // Use mock numbers for testing
+ Map<String, String> idealStateParams = new HashMap<>();
+ idealStateParams.put("MinActiveReplicas", "2");
+ idealStateParams.put("StateModelDefRef", "MasterSlave");
+ idealStateParams.put("MaxPartitionsPerInstance", "3");
+ idealStateParams.put("Replicas", "3");
+ idealStateParams.put("NumPartitions", "3");
+
+ // Create a mock state mapping for testing
+ Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
+ String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p1 = {"MASTER", "SLAVE", "ERROR"};
+ String[] p2 = {"ERROR", "SLAVE", "SLAVE"};
+ partitionReplicaStates.put("p0", Arrays.asList(p0));
+ partitionReplicaStates.put("p1", Arrays.asList(p1));
+ partitionReplicaStates.put("p2", Arrays.asList(p2));
+
+ createDummyMapping(clusterName, resourceName, idealStateParams, partitionReplicaStates);
+
+ // Get the result of getPartitionHealth
+ String body = get("clusters/" + clusterName + "/resources/" + resourceName + "/health",
+ Response.Status.OK.getStatusCode(), true);
+
+ JsonNode node = OBJECT_MAPPER.readTree(body);
+ Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+
+ Assert.assertEquals(healthStatus.get("p0"), "HEALTHY");
+ Assert.assertEquals(healthStatus.get("p1"), "PARTIAL_HEALTHY");
+ Assert.assertEquals(healthStatus.get("p2"), "UNHEALTHY");
+ }
+
+ @Test(dependsOnMethods = "testPartitionHealth")
+ public void testResourceHealth() throws Exception {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ String clusterName = "TestCluster_1";
+ Map<String, String> idealStateParams = new HashMap<>();
+ idealStateParams.put("MinActiveReplicas", "2");
+ idealStateParams.put("StateModelDefRef", "MasterSlave");
+ idealStateParams.put("MaxPartitionsPerInstance", "3");
+ idealStateParams.put("Replicas", "3");
+ idealStateParams.put("NumPartitions", "3");
+
+ // Create a healthy resource
+ String resourceNameHealthy = clusterName + "_db_0";
+ Map<String, List<String>> partitionReplicaStates = new LinkedHashMap<>();
+ String[] p0 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p1 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p2 = {"MASTER", "SLAVE", "SLAVE"};
+ partitionReplicaStates.put("p0", Arrays.asList(p0));
+ partitionReplicaStates.put("p1", Arrays.asList(p1));
+ partitionReplicaStates.put("p2", Arrays.asList(p2));
+
+ createDummyMapping(clusterName, resourceNameHealthy, idealStateParams, partitionReplicaStates);
+ // Create a partially healthy resource
+ String resourceNamePartiallyHealthy = clusterName + "_db_1";
+ Map<String, List<String>> partitionReplicaStates_1 = new LinkedHashMap<>();
+ String[] p0_1 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p1_1 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p2_1 = {"MASTER", "SLAVE", "ERROR"};
+ partitionReplicaStates_1.put("p0", Arrays.asList(p0_1));
+ partitionReplicaStates_1.put("p1", Arrays.asList(p1_1));
+ partitionReplicaStates_1.put("p2", Arrays.asList(p2_1));
+ createDummyMapping(clusterName, resourceNamePartiallyHealthy, idealStateParams, partitionReplicaStates_1);
+
+ // Create a partially healthy resource
+ String resourceNameUnhealthy = clusterName + "_db_2";
+ Map<String, List<String>> partitionReplicaStates_2 = new LinkedHashMap<>();
+ String[] p0_2 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p1_2 = {"MASTER", "SLAVE", "SLAVE"};
+ String[] p2_2 = {"ERROR", "SLAVE", "ERROR"};
+ partitionReplicaStates_2.put("p0", Arrays.asList(p0_2));
+ partitionReplicaStates_2.put("p1", Arrays.asList(p1_2));
+ partitionReplicaStates_2.put("p2", Arrays.asList(p2_2));
+
+ createDummyMapping(clusterName, resourceNameUnhealthy, idealStateParams, partitionReplicaStates_2);
+
+ // Get the result of getResourceHealth
+ String body = get("clusters/" + clusterName + "/resources/health", Response.Status.OK.getStatusCode(),
+ true);
+
+ JsonNode node = OBJECT_MAPPER.readTree(body);
+ Map<String, String> healthStatus = OBJECT_MAPPER.readValue(node, new TypeReference<Map<String, String>>(){});
+
+ Assert.assertEquals(healthStatus.get(resourceNameHealthy), "HEALTHY");
+ Assert.assertEquals(healthStatus.get(resourceNamePartiallyHealthy), "PARTIAL_HEALTHY");
+ Assert.assertEquals(healthStatus.get(resourceNameUnhealthy), "UNHEALTHY");
+ }
+
+ /**
+ * Creates a setup where the health API can be tested.
+ *
+ * @param clusterName
+ * @param resourceName
+ * @param idealStateParams
+ * @param partitionReplicaStates maps partitionName to its replicas' states
+ * @throws Exception
+ */
+ private void createDummyMapping(String clusterName, String resourceName,
+ Map<String, String> idealStateParams,
+ Map<String, List<String>> partitionReplicaStates) throws Exception {
+ IdealState idealState = new IdealState(resourceName);
+ idealState.setMinActiveReplicas(Integer.parseInt(idealStateParams.get("MinActiveReplicas"))); // 2
+ idealState.setStateModelDefRef(idealStateParams.get("StateModelDefRef")); // MasterSlave
+ idealState.setMaxPartitionsPerInstance(Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance"))); // 3
+ idealState.setReplicas(idealStateParams.get("Replicas")); // 3
+ idealState.setNumPartitions(Integer.parseInt(idealStateParams.get("NumPartitions"))); // 3
+ idealState.enable(false);
+
+ Map<String, List<String>> partitionNames = new LinkedHashMap<>();
+ List<String> dummyPrefList = new ArrayList<>();
+
+ for (int i = 0; i < Integer.parseInt(idealStateParams.get("MaxPartitionsPerInstance")); i++) {
+ dummyPrefList.add(ANY_INSTANCE);
+ partitionNames.put("p" + i, dummyPrefList);
+ }
+ idealState.getRecord().getListFields().putAll(partitionNames);
+
+ if (!_gSetupTool.getClusterManagementTool().getClusters().contains(clusterName)) {
+ _gSetupTool.getClusterManagementTool().addCluster(clusterName);
+ }
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, resourceName, idealState);
+
+ // Set ExternalView's replica states for a given parameter map
+ ExternalView externalView = new ExternalView(resourceName);
+
+ Map<String, Map<String, String>> mappingCurrent = new LinkedHashMap<>();
+
+ List<String> partitionReplicaStatesList = new ArrayList<>(partitionReplicaStates.keySet());
+ for (int k = 0; k < partitionReplicaStatesList.size(); k++) {
+ Map<String, String> replicaStatesForPartition = new LinkedHashMap<>();
+ List<String> replicaStateList = partitionReplicaStates.get(partitionReplicaStatesList.get(k));
+ for (int i = 0; i < replicaStateList.size(); i++) {
+ replicaStatesForPartition.put("r" + i, replicaStateList.get(i));
+ }
+ mappingCurrent.put("p" + k, replicaStatesForPartition);
+ }
+
+ externalView.getRecord().getMapFields().putAll(mappingCurrent);
+
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(clusterName, "p1", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ helixManager.connect();
+ HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
+ helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().externalView(resourceName), externalView);
+ }
+}
\ No newline at end of file