You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/05/25 01:19:41 UTC
[helix] 07/44: Refactor InstanceAccessor to InstancesAccessor and
PerInstanceAccessor
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit e9fb49652d37b7f5eec5e0924056804112a43b96
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Mar 29 16:40:51 2019 -0700
Refactor InstanceAccessor to InstancesAccessor and PerInstanceAccessor
RB=1614063
BUG=HELIX-1725
G=helix-reviewers
A=hulee
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../rest/server/resources/AbstractResource.java | 1 +
.../resources/exceptions/HelixHealthException.java | 16 ++
.../server/resources/helix/InstancesAccessor.java | 244 ++++++++++++++++
...tanceAccessor.java => PerInstanceAccessor.java} | 315 +++------------------
.../helix/rest/server/service/InstanceService.java | 4 +
.../rest/server/service/InstanceServiceImpl.java | 21 +-
.../helix/rest/server/AbstractTestClass.java | 36 +++
.../helix/rest/server/TestInstancesAccessor.java | 124 ++++++++
...eAccessor.java => TestPerInstanceAccessor.java} | 164 +----------
9 files changed, 504 insertions(+), 421 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index 3b7d995..5128493 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -62,6 +62,7 @@ public class AbstractResource {
disablePartitions,
update,
delete,
+ stoppable,
rebalance,
reset,
resetPartitions,
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java
new file mode 100644
index 0000000..238a5f0
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/exceptions/HelixHealthException.java
@@ -0,0 +1,16 @@
+package org.apache.helix.rest.server.resources.exceptions;
+
+public class HelixHealthException extends RuntimeException {
+
+ public HelixHealthException(String message) {
+ super(message);
+ }
+
+ public HelixHealthException(Throwable cause) {
+ super(cause);
+ }
+
+ public HelixHealthException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
new file mode 100644
index 0000000..a697e9b
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -0,0 +1,244 @@
+package org.apache.helix.rest.server.resources.helix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
+import org.apache.helix.rest.server.service.ClusterService;
+import org.apache.helix.rest.server.service.ClusterServiceImpl;
+import org.apache.helix.rest.server.service.InstanceService;
+import org.apache.helix.rest.server.service.InstanceServiceImpl;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.codehaus.jackson.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("/clusters/{clusterId}/instances")
+public class InstancesAccessor extends AbstractHelixResource {
+ private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class);
+
+ public enum InstancesProperties {
+ instances,
+ online,
+ disabled,
+ selection_base,
+ zone_order,
+ customized_values,
+ instance_stoppable_parallel,
+ instance_not_stoppable_with_reasons
+ }
+
+ public enum InstanceHealthSelectionBase {
+ instance_based,
+ zone_based
+ }
+
+
+ @GET
+ public Response getAllInstances(@PathParam("clusterId") String clusterId) {
+ HelixDataAccessor accessor = getDataAccssor(clusterId);
+ List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
+
+ if (instances == null) {
+ return notFound();
+ }
+
+ ObjectNode root = JsonNodeFactory.instance.objectNode();
+ root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
+
+ ArrayNode instancesNode = root.putArray(InstancesAccessor.InstancesProperties.instances.name());
+ instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
+ ArrayNode onlineNode = root.putArray(InstancesAccessor.InstancesProperties.online.name());
+ ArrayNode disabledNode = root.putArray(InstancesAccessor.InstancesProperties.disabled.name());
+
+ List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
+ ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
+
+ for (String instanceName : instances) {
+ InstanceConfig instanceConfig =
+ accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
+ if (instanceConfig != null) {
+ if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
+ && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
+ disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
+ }
+
+ if (liveInstances.contains(instanceName)){
+ onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
+ }
+ }
+ }
+
+ return JSONRepresentation(root);
+ }
+
+ @POST
+ public Response instancesOperations(@PathParam("clusterId") String clusterId,
+ @QueryParam("command") String command, String content) {
+ Command cmd;
+ try {
+ cmd = Command.valueOf(command);
+ } catch (Exception e) {
+ return badRequest("Invalid command : " + command);
+ }
+
+ HelixAdmin admin = getHelixAdmin();
+ try {
+ JsonNode node = null;
+ if (content.length() != 0) {
+ node = OBJECT_MAPPER.readTree(content);
+ }
+ if (node == null) {
+ return badRequest("Invalid input for content : " + content);
+ }
+ List<String> enableInstances = OBJECT_MAPPER
+ .readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
+ switch (cmd) {
+ case enable:
+ admin.enableInstance(clusterId, enableInstances, true);
+ break;
+ case disable:
+ admin.enableInstance(clusterId, enableInstances, false);
+ break;
+ case stoppable:
+ return getParallelStoppableInstances(clusterId, node);
+ default:
+ _logger.error("Unsupported command :" + command);
+ return badRequest("Unsupported command :" + command);
+ }
+ } catch (HelixHealthException e) {
+ _logger
+ .error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
+ return serverError(e);
+ } catch (Exception e) {
+ _logger.error("Failed in updating instances : " + content, e);
+ return badRequest(e.getMessage());
+ }
+ return OK();
+ }
+
+ private Response getParallelStoppableInstances(String clusterId, JsonNode node)
+ throws IOException {
+ try {
+ // TODO: Process input data from the content
+ InstancesAccessor.InstanceHealthSelectionBase selectionBase =
+ InstancesAccessor.InstanceHealthSelectionBase.valueOf(
+ node.get(InstancesAccessor.InstancesProperties.selection_base.name())
+ .getValueAsText());
+ List<String> instances = OBJECT_MAPPER
+ .readValue(node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
+
+ List<String> orderOfZone = null;
+ String customizedInput = null;
+ if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
+ customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).getTextValue();
+ }
+
+ if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) != null) {
+ orderOfZone = OBJECT_MAPPER
+ .readValue(node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
+ }
+
+ // Prepare output result
+ ObjectNode result = JsonNodeFactory.instance.objectNode();
+ ArrayNode stoppableInstances =
+ result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+ ObjectNode failedStoppableInstances = result.putObject(
+ InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+ InstanceService instanceService =
+ new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+ switch (selectionBase) {
+ case zone_based:
+ List<String> zoneBasedInstance = getZoneBasedInstances(clusterId, instances, orderOfZone);
+ for (String instance : zoneBasedInstance) {
+ StoppableCheck stoppableCheck =
+ instanceService.checkSingleInstanceStoppable(clusterId, instance, customizedInput);
+ if (!stoppableCheck.isStoppable()) {
+ ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
+ for (String failedReason : stoppableCheck.getFailedChecks()) {
+ failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
+ }
+ } else {
+ stoppableInstances.add(instance);
+ }
+ }
+ break;
+ case instance_based:
+ default:
+ throw new NotImplementedException("instance_based selection is not supported yet!");
+ }
+ return JSONRepresentation(result);
+ } catch (HelixException e) {
+ _logger
+ .error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
+ throw new HelixHealthException(e);
+ } catch (Exception e) {
+ _logger.error(String.format(
+ "Failed to get parallel stoppable instances for cluster %s with a HelixException!",
+ clusterId), e);
+ throw e;
+ }
+ }
+
+ /**
+ * Get instances belongs to the first zone. If the zone is already empty, Helix will iterate zones
+ * by order until find the zone contains instances.
+ *
+ * The order of zones can directly come from user input. If user did not specify it, Helix will order
+ * zones with alphabetical order.
+ *
+ * @param clusterId
+ * @param instances
+ * @param orderedZones
+ * @return
+ */
+ private List<String> getZoneBasedInstances(String clusterId, List<String> instances,
+ List<String> orderedZones) {
+ ClusterService clusterService =
+ new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+ Map<String, Set<String>> zoneMapping =
+ clusterService.getClusterTopology(clusterId).toZoneMapping();
+ if (orderedZones == null) {
+ orderedZones = new ArrayList<>(zoneMapping.keySet());
+ }
+ Collections.sort(orderedZones);
+ if (orderedZones.isEmpty()) {
+ return orderedZones;
+ }
+
+ Set<String> instanceSet = null;
+ for (String zone : orderedZones) {
+ instanceSet = new TreeSet<>(instances);
+ Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
+ instanceSet.retainAll(currentZoneInstanceSet);
+ if (instanceSet.size() > 0) {
+ return new ArrayList<>(instanceSet);
+ }
+ }
+
+ return Collections.EMPTY_LIST;
+ }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
similarity index 59%
rename from helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
rename to helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index d07fb4e..e95f0a2 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -21,13 +21,7 @@ package org.apache.helix.rest.server.resources.helix;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -38,14 +32,12 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Error;
import org.apache.helix.model.HealthStat;
@@ -54,13 +46,9 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.rest.client.CustomRestClient;
-import org.apache.helix.rest.client.CustomRestClientFactory;
import org.apache.helix.rest.common.HelixDataAccessorWrapper;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
-import org.apache.helix.rest.server.service.ClusterService;
-import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.rest.server.service.InstanceService;
import org.apache.helix.rest.server.service.InstanceServiceImpl;
import org.codehaus.jackson.JsonNode;
@@ -72,14 +60,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Path("/clusters/{clusterId}/instances")
-public class InstanceAccessor extends AbstractHelixResource {
- private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class);
+@Path("/clusters/{clusterId}/instances/{instanceName}")
+public class PerInstanceAccessor extends AbstractHelixResource {
+ private final static Logger _logger = LoggerFactory.getLogger(PerInstanceAccessor.class);
- public enum InstanceProperties {
- instances,
- online,
- disabled,
+ public enum PerInstanceProperties {
config,
liveInstance,
resource,
@@ -91,181 +76,18 @@ public class InstanceAccessor extends AbstractHelixResource {
total_message_count,
read_message_count,
healthreports,
- instanceTags,
- selection_base,
- zone_order,
- customized_values,
- instance_stoppable_parallel,
- instance_not_stoppable_with_reasons
- }
-
- public enum InstanceHealthSelectionBase {
- instance_based,
- zone_based
+ instanceTags
}
@GET
- public Response getAllInstances(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
-
- ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name());
- ArrayNode onlineNode = root.putArray(InstanceProperties.online.name());
- ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name());
-
- List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
-
- if (instances != null) {
- instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
- } else {
- return notFound();
- }
-
- List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
- ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
-
- for (String instanceName : instances) {
- InstanceConfig instanceConfig =
- accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
- if (instanceConfig != null) {
- if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
- && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
- disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
- }
-
- if (liveInstances.contains(instanceName)){
- onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
- }
- }
- }
-
- return JSONRepresentation(root);
- }
-
- @POST
- public Response updateInstances(@PathParam("clusterId") String clusterId,
- @QueryParam("command") String command, String content) {
- Command cmd;
- try {
- cmd = Command.valueOf(command);
- } catch (Exception e) {
- return badRequest("Invalid command : " + command);
- }
-
- HelixAdmin admin = getHelixAdmin();
- try {
- JsonNode node = null;
- if (content.length() != 0) {
- node = OBJECT_MAPPER.readTree(content);
- }
- if (node == null) {
- return badRequest("Invalid input for content : " + content);
- }
- List<String> enableInstances = OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instances.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
- switch (cmd) {
- case enable:
- admin.enableInstance(clusterId, enableInstances, true);
-
- break;
- case disable:
- admin.enableInstance(clusterId, enableInstances, false);
- break;
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
- }
- } catch (Exception e) {
- _logger.error("Failed in updating instances : " + content, e);
- return badRequest(e.getMessage());
- }
- return OK();
- }
-
- @POST
- @Path("stoppable")
- @Consumes(MediaType.APPLICATION_JSON)
- public Response getParallelStoppableInstances(@PathParam("clusterId") String clusterId,
- String content) {
- try {
- JsonNode node = null;
- if (content.length() != 0) {
- node = OBJECT_MAPPER.readTree(content);
- }
- if (node == null) {
- return badRequest("Invalid input for content : " + content);
- }
-
- // TODO: Process input data from the content
- InstanceHealthSelectionBase selectionBase = InstanceHealthSelectionBase
- .valueOf(node.get(InstanceProperties.selection_base.name()).getValueAsText());
- List<String> instances = OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instances.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
-
- List<String> orderOfZone = null;
- String customizedInput = null;
- if (node.get(InstanceProperties.customized_values.name()) != null) {
- customizedInput = node.get(InstanceProperties.customized_values.name()).getTextValue();
- }
-
- if (node.get(InstanceProperties.zone_order.name()) != null) {
- orderOfZone = OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.zone_order.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
- }
-
- // Prepare output result
- ObjectNode result = JsonNodeFactory.instance.objectNode();
- ArrayNode stoppableInstances =
- result.putArray(InstanceProperties.instance_stoppable_parallel.name());
- ObjectNode failedStoppableInstances =
- result.putObject(InstanceProperties.instance_not_stoppable_with_reasons.name());
-
- switch (selectionBase) {
- case zone_based:
- List<String> zoneBasedInstance = getZoneBasedInstance(clusterId, instances, orderOfZone);
- for (String instance : zoneBasedInstance) {
- StoppableCheck stoppableCheck =
- checkSingleInstanceStoppable(clusterId, instance, customizedInput);
- if (!stoppableCheck.isStoppable()) {
- ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
- for (String failedReason : stoppableCheck.getFailedChecks()) {
- failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
- }
- } else {
- stoppableInstances.add(instance);
- }
- }
- break;
- case instance_based:
- default:
- throw new NotImplementedException("instance_based selection is not support now!");
- }
- return JSONRepresentation(result);
- } catch (HelixException e) {
- _logger
- .error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
- return serverError(e);
- } catch (Exception e) {
- _logger.error(
- String.format("Failed to get parallel stoppable instances for cluster %s!", clusterId),
- e);
- return serverError(e);
- }
- }
-
- @GET
- @Path("{instanceName}")
public Response getInstanceById(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
// TODO reduce GC by dependency injection
- InstanceService instanceService = new InstanceServiceImpl(
- new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
+ InstanceService instanceService =
+ new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor),
+ getConfigAccessor());
InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
@@ -273,14 +95,15 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@POST
- @Path("{instanceName}/stoppable")
+ @Path("stoppable")
@Consumes(MediaType.APPLICATION_JSON)
- public Response isInstanceStoppable(String jsonContent,
- @PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName) throws IOException {
+ public Response isInstanceStoppable(String jsonContent, @PathParam("clusterId") String clusterId,
+ @PathParam("instanceName") String instanceName) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
StoppableCheck stoppableCheck = null;
try {
- stoppableCheck = checkSingleInstanceStoppable(clusterId, instanceName, jsonContent);
+ stoppableCheck = new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor())
+ .checkSingleInstanceStoppable(clusterId, instanceName, jsonContent);
} catch (HelixException e) {
_logger
.error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
@@ -290,7 +113,6 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@PUT
- @Path("{instanceName}")
public Response addInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, String content) {
HelixAdmin admin = getHelixAdmin();
@@ -313,7 +135,6 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@POST
- @Path("{instanceName}")
public Response updateInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, @QueryParam("command") String command,
String content) {
@@ -345,8 +166,8 @@ public class InstanceAccessor extends AbstractHelixResource {
return badRequest("Instance names are not match!");
}
admin.resetPartition(clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+ node.get(PerInstanceProperties.resource.name()).getTextValue(), (List<String>) OBJECT_MAPPER
+ .readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
OBJECT_MAPPER.getTypeFactory()
.constructCollectionType(List.class, String.class)));
break;
@@ -355,7 +176,7 @@ public class InstanceAccessor extends AbstractHelixResource {
return badRequest("Instance names are not match!");
}
for (String tag : (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
+ .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
admin.addInstanceTag(clusterId, instanceName, tag);
}
@@ -365,24 +186,24 @@ public class InstanceAccessor extends AbstractHelixResource {
return badRequest("Instance names are not match!");
}
for (String tag : (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
+ .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
admin.removeInstanceTag(clusterId, instanceName, tag);
}
break;
case enablePartitions:
admin.enablePartition(true, clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).getTextValue(),
+ node.get(PerInstanceProperties.resource.name()).getTextValue(),
(List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+ .readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
OBJECT_MAPPER.getTypeFactory()
.constructCollectionType(List.class, String.class)));
break;
case disablePartitions:
admin.enablePartition(false, clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).getTextValue(),
+ node.get(PerInstanceProperties.resource.name()).getTextValue(),
(List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
+ .readValue(node.get(PerInstanceProperties.partitions.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)));
break;
default:
@@ -397,7 +218,6 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@DELETE
- @Path("{instanceName}")
public Response deleteInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) {
HelixAdmin admin = getHelixAdmin();
@@ -412,7 +232,7 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/configs")
+ @Path("configs")
public Response getInstanceConfig(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
@@ -427,7 +247,7 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@POST
- @Path("{instanceName}/configs")
+ @Path("configs")
public Response updateInstanceConfig(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, @QueryParam("command") String commandStr,
String content) {
@@ -476,14 +296,14 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/resources")
+ @Path("resources")
public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), instanceName);
- ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name());
+ ArrayNode resourcesNode = root.putArray(PerInstanceProperties.resources.name());
List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName));
if (sessionIds == null || sessionIds.size() == 0) {
@@ -502,8 +322,7 @@ public class InstanceAccessor extends AbstractHelixResource {
return JSONRepresentation(root);
}
- @GET
- @Path("{instanceName}/resources/{resourceName}")
+ @GET @Path("resources/{resourceName}")
public Response getResourceOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName,
@PathParam("resourceName") String resourceName) throws IOException {
@@ -515,8 +334,8 @@ public class InstanceAccessor extends AbstractHelixResource {
// Only get resource list from current session id
String currentSessionId = sessionIds.get(0);
- CurrentState resourceCurrentState = accessor
- .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
+ CurrentState resourceCurrentState = accessor.getProperty(
+ accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
if (resourceCurrentState != null) {
return JSONRepresentation(resourceCurrentState.getRecord());
}
@@ -524,8 +343,7 @@ public class InstanceAccessor extends AbstractHelixResource {
return notFound();
}
- @GET
- @Path("{instanceName}/errors")
+ @GET @Path("errors")
public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
@@ -534,8 +352,7 @@ public class InstanceAccessor extends AbstractHelixResource {
root.put(Properties.id.name(), instanceName);
ObjectNode errorsNode = JsonNodeFactory.instance.objectNode();
- List<String> sessionIds =
- accessor.getChildNames(accessor.keyBuilder().errors(instanceName));
+ List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().errors(instanceName));
if (sessionIds == null || sessionIds.size() == 0) {
return notFound();
@@ -557,13 +374,13 @@ public class InstanceAccessor extends AbstractHelixResource {
errorsNode.put(sessionId, resourcesNode);
}
}
- root.put(InstanceProperties.errors.name(), errorsNode);
+ root.put(PerInstanceProperties.errors.name(), errorsNode);
return JSONRepresentation(root);
}
@GET
- @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}")
+ @Path("errors/{sessionId}/{resourceName}/{partitionName}")
public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId,
@PathParam("resourceName") String resourceName,
@@ -579,7 +396,7 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/history")
+ @Path("history")
public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
@@ -592,7 +409,7 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/messages")
+ @Path("messages")
public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName,
@QueryParam("stateModelDef") String stateModelDef) {
@@ -600,9 +417,8 @@ public class InstanceAccessor extends AbstractHelixResource {
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), instanceName);
- ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name());
- ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name());
-
+ ArrayNode newMessages = root.putArray(PerInstanceProperties.new_messages.name());
+ ArrayNode readMessages = root.putArray(PerInstanceProperties.read_messages.name());
List<String> messageNames =
accessor.getChildNames(accessor.keyBuilder().messages(instanceName));
@@ -629,15 +445,15 @@ public class InstanceAccessor extends AbstractHelixResource {
}
}
- root.put(InstanceProperties.total_message_count.name(),
+ root.put(PerInstanceProperties.total_message_count.name(),
newMessages.size() + readMessages.size());
- root.put(InstanceProperties.read_message_count.name(), readMessages.size());
+ root.put(PerInstanceProperties.read_message_count.name(), readMessages.size());
return JSONRepresentation(root);
}
@GET
- @Path("{instanceName}/messages/{messageId}")
+ @Path("messages/{messageId}")
public Response getMessageOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName,
@PathParam("messageId") String messageId) throws IOException {
@@ -651,14 +467,14 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/healthreports")
+ @Path("healthreports")
public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
@PathParam("instanceName") String instanceName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), instanceName);
- ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name());
+ ArrayNode healthReportsNode = root.putArray(PerInstanceProperties.healthreports.name());
List<String> healthReports =
accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName));
@@ -671,9 +487,9 @@ public class InstanceAccessor extends AbstractHelixResource {
}
@GET
- @Path("{instanceName}/healthreports/{reportName}")
- public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
+ @Path("healthreports/{reportName}")
+ public Response getHealthReportsOnInstance(
+ @PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName,
@PathParam("reportName") String reportName) throws IOException {
HelixDataAccessor accessor = getDataAccssor(clusterId);
HealthStat healthStat =
@@ -688,47 +504,4 @@ public class InstanceAccessor extends AbstractHelixResource {
private boolean validInstance(JsonNode node, String instanceName) {
return instanceName.equals(node.get(Properties.id.name()).getValueAsText());
}
-
- private List<String> getZoneBasedInstance(String clusterId, List<String> instances, List<String> orderOfZone) {
- ClusterService
- clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
- Map<String, Set<String>> zoneMapping = clusterService.getClusterTopology(clusterId).toZoneMapping();
- if (orderOfZone == null) {
- orderOfZone = new ArrayList<>(zoneMapping.keySet());
- }
- Collections.sort(orderOfZone);
- if (orderOfZone.isEmpty()) {
- return orderOfZone;
- }
-
- Set<String> instanceSet = null;
- for (String zone : orderOfZone) {
- instanceSet = new TreeSet<>(instances);
- Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
- instanceSet.retainAll(currentZoneInstanceSet);
- if (instanceSet.size() > 0) {
- return new ArrayList<>(instanceSet);
- }
- }
-
- return Collections.EMPTY_LIST;
- }
-
- private StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
- String jsonContent) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- // TODO reduce GC by dependency injection
- InstanceService instanceService = new InstanceServiceImpl(
- new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
-
- Map<String, Boolean> helixStoppableCheck = instanceService.getInstanceHealthStatus(clusterId,
- instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
- CustomRestClient customClient = CustomRestClientFactory.get(jsonContent);
- // TODO add the json content parse logic
- Map<String, Boolean> customStoppableCheck =
- customClient.getInstanceStoppableCheck(Collections.<String, String> emptyMap());
- StoppableCheck stoppableCheck =
- StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
- return stoppableCheck;
- }
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index 42bfe67..f32551b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import com.google.common.collect.ImmutableList;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
public interface InstanceService {
enum HealthCheck {
@@ -89,4 +90,7 @@ public interface InstanceService {
*/
InstanceInfo getInstanceInfo(String clusterId, String instanceName,
List<HealthCheck> healthChecks);
+
+ StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
+ String jsonContent);
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index c7ef015..22ac30b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -20,16 +20,19 @@ package org.apache.helix.rest.server.service;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.rest.client.CustomRestClient;
+import org.apache.helix.rest.client.CustomRestClientFactory;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,4 +128,20 @@ public class InstanceServiceImpl implements InstanceService {
return instanceInfoBuilder.build();
}
+
+
+ @Override
+ public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
+ String jsonContent) {
+ // TODO reduce GC by dependency injection
+ Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId,
+ instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
+ CustomRestClient customClient = CustomRestClientFactory.get(jsonContent);
+ // TODO add the json content parse logic
+ Map<String, Boolean> customStoppableCheck =
+ customClient.getInstanceStoppableCheck(Collections.<String, String> emptyMap());
+ StoppableCheck stoppableCheck =
+ StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
+ return stoppableCheck;
+ }
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 9caed00..51a64e8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.logging.Level;
import javax.ws.rs.client.Entity;
@@ -52,6 +53,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.common.HelixRestNamespace;
import org.apache.helix.rest.server.auditlog.AuditLog;
@@ -105,6 +107,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected static final String TEST_NAMESPACE = "test-namespace";
protected static HelixZkClient _gZkClientTestNS;
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
+ protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
+ protected static final List<String> STOPPABLE_INSTANCES =
+ Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
@@ -276,6 +281,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
_resourcesMap.put(cluster, resources);
startController(cluster);
}
+ preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
+ _clusters.add(STOPPABLE_CLUSTER);
}
protected Set<String> createInstances(String cluster, int numInstances) throws Exception {
@@ -468,4 +475,33 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected TaskDriver getTaskDriver(String clusterName) {
return new TaskDriver(_gZkClient, clusterName);
}
+
+ private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) {
+ _gSetupTool.addCluster(clusterName, true);
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setFaultZoneType("helixZoneId");
+ clusterConfig.setPersistIntermediateAssignment(true);
+ _configAccessor.setClusterConfig(clusterName, clusterConfig);
+ // Create instance configs
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ for (int i = 0; i < instances.size() - 1; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+ instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i);
+ instanceConfigs.add(instanceConfig);
+ }
+ instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1)));
+ instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5");
+
+ instanceConfigs.get(1).setInstanceEnabled(false);
+ instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
+
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
+ }
+
+ // Start participant
+ startInstances(clusterName, new TreeSet<>(instances), 3);
+ createResources(clusterName, 1);
+ startController(clusterName);
+ }
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
new file mode 100644
index 0000000..2d9b420
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -0,0 +1,124 @@
+package org.apache.helix.rest.server;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
+import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.codehaus.jackson.JsonNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestInstancesAccessor extends AbstractTestClass {
+ private final static String CLUSTER_NAME = "TestCluster_0";
+
+ @Test
+ public void testEndToEndChecks() {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ // Select instances with zone based
+ String content = String
+ .format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"]}",
+ InstancesAccessor.InstancesProperties.selection_base.name(),
+ InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+ InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1",
+ "instance2", "instance3", "instance4", "instance5");
+ Response response =
+ new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable").format(STOPPABLE_CLUSTER)
+ .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+ String checkResult = response.readEntity(String.class);
+ Assert.assertEquals(checkResult,
+ "{\n \"instance_stoppable_parallel\" : [ \"instance0\", \"instance2\" ],\n"
+ + " \"instance_not_stoppable_with_reasons\" : {\n \"instance1\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_ENABLED\", \"Helix:INSTANCE_NOT_STABLE\" ],\n"
+ + " \"instance3\" : [ \"Helix:HAS_DISABLED_PARTITION\" ],\n"
+ + " \"instance4\" : [ \"Helix:EMPTY_RESOURCE_ASSIGNMENT\", \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ALIVE\" ]\n }\n}\n");
+
+ // Disable one selected instance0, it should failed to check
+ String instance = "instance0";
+ InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER, instance);
+ instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
+
+ Entity entity =
+ Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
+ response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
+ .format(STOPPABLE_CLUSTER, instance).post(this, entity);
+ checkResult = response.readEntity(String.class);
+ Assert.assertEquals(checkResult,
+ "{\"stoppable\":false,\"failedChecks\":[\"Helix:HAS_DISABLED_PARTITION\",\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:INSTANCE_NOT_STABLE\"]}");
+
+ // Reenable instance0, it should passed the check
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true);
+ _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
+ HelixClusterVerifier
+ verifier = new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling());
+
+ entity =
+ Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
+ response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
+ .format(STOPPABLE_CLUSTER, instance).post(this, entity);
+ checkResult = response.readEntity(String.class);
+ Assert.assertEquals(checkResult,
+ "{\"stoppable\":true,\"failedChecks\":[]}");
+ }
+
+ @Test(dependsOnMethods = "testEndToEndChecks")
+ public void testGetAllInstances() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String body = new JerseyUriRequestBuilder("clusters/{}/instances").isBodyReturnExpected(true)
+ .format(CLUSTER_NAME).get(this);
+
+ JsonNode node = OBJECT_MAPPER.readTree(body);
+ String instancesStr = node.get(InstancesAccessor.InstancesProperties.instances.name()).toString();
+ Assert.assertNotNull(instancesStr);
+
+ Set<String> instances = OBJECT_MAPPER.readValue(instancesStr,
+ OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class));
+ Assert.assertEquals(instances, _instancesMap.get(CLUSTER_NAME), "Instances from response: "
+ + instances + " vs instances actually: " + _instancesMap.get(CLUSTER_NAME));
+ }
+
+ @Test(enabled = false)
+ public void testUpdateInstances() throws IOException {
+ // TODO: Reenable the test after storage node fix the problem
+ // Batch disable instances
+
+ List<String> instancesToDisable = Arrays.asList(
+ new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12919",
+ CLUSTER_NAME + "localhost_12920"
+ });
+ Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(
+ ImmutableMap.of(InstancesAccessor.InstancesProperties.instances.name(), instancesToDisable)),
+ MediaType.APPLICATION_JSON_TYPE);
+ post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "disable"), entity,
+ Response.Status.OK.getStatusCode());
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+ Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
+ new HashSet<>(instancesToDisable));
+
+ instancesToDisable = Arrays
+ .asList(new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12920"
+ });
+ entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(
+ ImmutableMap.of(InstancesAccessor.InstancesProperties.instances.name(), instancesToDisable)),
+ MediaType.APPLICATION_JSON_TYPE);
+ post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "enable"), entity,
+ Response.Status.OK.getStatusCode());
+ clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+ Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
+ new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919")));
+ }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
similarity index 66%
rename from helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
rename to helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index a4d0156..1a9a6da 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -26,9 +26,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -38,14 +36,11 @@ import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.rest.server.resources.AbstractResource;
-import org.apache.helix.rest.server.resources.helix.InstanceAccessor;
+import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.apache.helix.util.InstanceValidationUtil;
import org.codehaus.jackson.JsonNode;
import org.testng.Assert;
@@ -54,79 +49,22 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-public class TestInstanceAccessor extends AbstractTestClass {
+public class TestPerInstanceAccessor extends AbstractTestClass {
private final static String CLUSTER_NAME = "TestCluster_0";
private final static String INSTANCE_NAME = CLUSTER_NAME + "localhost_12918";
-
@Test
- public void testEndToEndChecks() {
- System.out.println("Start test :" + TestHelper.getTestMethodName());
- String clusterName = TestHelper.getTestMethodName();
- List<String> instances =
- Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
- preSetupForParallelInstancesStoppableTest(clusterName, instances);
-
- // Select instances with zone based
- String content = String
- .format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"]}",
- InstanceAccessor.InstanceProperties.selection_base.name(),
- InstanceAccessor.InstanceHealthSelectionBase.zone_based.name(),
- InstanceAccessor.InstanceProperties.instances.name(), "instance0", "instance1",
- "instance2", "instance3", "instance4", "instance5");
- Response response =
- new JerseyUriRequestBuilder("clusters/{}/instances/stoppable").format(clusterName)
- .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
- String checkResult = response.readEntity(String.class);
- Assert.assertEquals(checkResult,
- "{\n \"instance_stoppable_parallel\" : [ \"instance0\", \"instance2\" ],\n"
- + " \"instance_not_stoppable_with_reasons\" : {\n \"instance1\" : [ \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ENABLED\", \"Helix:EMPTY_RESOURCE_ASSIGNMENT\" ],\n"
- + " \"instance3\" : [ \"Helix:HAS_DISABLED_PARTITION\" ],\n"
- + " \"instance4\" : [ \"Helix:INSTANCE_NOT_STABLE\", \"Helix:INSTANCE_NOT_ALIVE\", \"Helix:EMPTY_RESOURCE_ASSIGNMENT\" ]\n }\n}\n");
-
- // Disable one selected instance0, it should failed to check
- String instance = "instance0";
- InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance);
- instanceConfig.setInstanceEnabled(false);
- instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
- _configAccessor.setInstanceConfig(clusterName, instance, instanceConfig);
-
- Entity entity =
- Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
- response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
- .format(clusterName, instance).post(this, entity);
- checkResult = response.readEntity(String.class);
- Assert.assertEquals(checkResult,
- "{\"stoppable\":false,\"failedChecks\":[\"Helix:INSTANCE_NOT_STABLE\",\"Helix:HAS_DISABLED_PARTITION\",\"Helix:INSTANCE_NOT_ENABLED\"]}");
-
- // Reenable instance0, it should passed the check
- instanceConfig.setInstanceEnabled(true);
- instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true);
- _configAccessor.setInstanceConfig(clusterName, instance, instanceConfig);
- HelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build();
- Assert.assertTrue(((BestPossibleExternalViewVerifier) verifier).verifyByPolling());
-
- entity =
- Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
- response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
- .format(clusterName, instance).post(this, entity);
- checkResult = response.readEntity(String.class);
- Assert.assertEquals(checkResult,
- "{\"stoppable\":true,\"failedChecks\":[]}");
- }
-
- @Test(dependsOnMethods = "testEndToEndChecks")
public void testIsInstanceStoppable() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
Map<String, String> params = ImmutableMap.of("client", "espresso");
Entity entity =
Entity.entity(OBJECT_MAPPER.writeValueAsString(params), MediaType.APPLICATION_JSON_TYPE);
Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
- .format("testEndToEndChecks", "instance1").post(this, entity);
+ .format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
String stoppableCheckResult = response.readEntity(String.class);
Assert.assertEquals(stoppableCheckResult,
- "{\"stoppable\":false,\"failedChecks\":[\"Helix:INSTANCE_NOT_STABLE\","
- + "\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:EMPTY_RESOURCE_ASSIGNMENT\"]}");
+ "{\"stoppable\":false,\"failedChecks\":[\"Helix:EMPTY_RESOURCE_ASSIGNMENT\","
+ + "\"Helix:INSTANCE_NOT_ENABLED\",\"Helix:INSTANCE_NOT_STABLE\"]}");
}
@Test (dependsOnMethods = "testIsInstanceStoppable")
@@ -150,7 +88,7 @@ public class TestInstanceAccessor extends AbstractTestClass {
.isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this);
JsonNode node = OBJECT_MAPPER.readTree(body);
int newMessageCount =
- node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue();
+ node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue();
Assert.assertEquals(newMessageCount, 1);
}
@@ -177,7 +115,7 @@ public class TestInstanceAccessor extends AbstractTestClass {
.isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this);
JsonNode node = OBJECT_MAPPER.readTree(body);
int newMessageCount =
- node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue();
+ node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue();
Assert.assertEquals(newMessageCount, 1);
@@ -186,34 +124,18 @@ public class TestInstanceAccessor extends AbstractTestClass {
.isBodyReturnExpected(true).format(CLUSTER_NAME, INSTANCE_NAME).get(this);
node = OBJECT_MAPPER.readTree(body);
newMessageCount =
- node.get(InstanceAccessor.InstanceProperties.total_message_count.name()).getIntValue();
+ node.get(PerInstanceAccessor.PerInstanceProperties.total_message_count.name()).getIntValue();
Assert.assertEquals(newMessageCount, 0);
}
@Test(dependsOnMethods = "testGetMessagesByStateModelDef")
- public void testGetAllInstances() throws IOException {
- System.out.println("Start test :" + TestHelper.getTestMethodName());
- String body = new JerseyUriRequestBuilder("clusters/{}/instances").isBodyReturnExpected(true)
- .format(CLUSTER_NAME).get(this);
-
- JsonNode node = OBJECT_MAPPER.readTree(body);
- String instancesStr = node.get(InstanceAccessor.InstanceProperties.instances.name()).toString();
- Assert.assertNotNull(instancesStr);
-
- Set<String> instances = OBJECT_MAPPER.readValue(instancesStr,
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class));
- Assert.assertEquals(instances, _instancesMap.get(CLUSTER_NAME), "Instances from response: "
- + instances + " vs instances actually: " + _instancesMap.get(CLUSTER_NAME));
- }
-
- @Test
public void testGetInstanceById() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String body = new JerseyUriRequestBuilder("clusters/{}/instances/{}").isBodyReturnExpected(true)
.format(CLUSTER_NAME, INSTANCE_NAME).get(this);
JsonNode node = OBJECT_MAPPER.readTree(body);
- String instancesCfg = node.get(InstanceAccessor.InstanceProperties.config.name()).toString();
+ String instancesCfg = node.get(PerInstanceAccessor.PerInstanceProperties.config.name()).toString();
Assert.assertNotNull(instancesCfg);
boolean isHealth = node.get("health").getBooleanValue();
Assert.assertFalse(isHealth);
@@ -268,7 +190,7 @@ public class TestInstanceAccessor extends AbstractTestClass {
List<String> tagList = ImmutableList.of("tag3", "tag1", "tag2");
entity = Entity.entity(
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
- INSTANCE_NAME, InstanceAccessor.InstanceProperties.instanceTags.name(), tagList)),
+ INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.instanceTags.name(), tagList)),
MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=addInstanceTag")
@@ -282,7 +204,7 @@ public class TestInstanceAccessor extends AbstractTestClass {
removeList.remove("tag2");
entity = Entity.entity(
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
- INSTANCE_NAME, InstanceAccessor.InstanceProperties.instanceTags.name(), removeList)),
+ INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.instanceTags.name(), removeList)),
MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=removeInstanceTag")
@@ -291,35 +213,6 @@ public class TestInstanceAccessor extends AbstractTestClass {
Assert.assertEquals(_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME).getTags(),
ImmutableList.of("tag2"));
- // TODO: Reenable the test after storage node fix the problem
- // Batch disable instances
- /*
- List<String> instancesToDisable = Arrays.asList(
- new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12919",
- CLUSTER_NAME + "localhost_12920"
- });
- entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(
- ImmutableMap.of(InstanceAccessor.InstanceProperties.instances.name(), instancesToDisable)),
- MediaType.APPLICATION_JSON_TYPE);
- post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "disable"), entity,
- Response.Status.OK.getStatusCode());
- ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
- Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
- new HashSet<>(instancesToDisable));
-
- instancesToDisable = Arrays
- .asList(new String[] { CLUSTER_NAME + "localhost_12918", CLUSTER_NAME + "localhost_12920"
- });
- entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(
- ImmutableMap.of(InstanceAccessor.InstanceProperties.instances.name(), instancesToDisable)),
- MediaType.APPLICATION_JSON_TYPE);
- post("clusters/" + CLUSTER_NAME + "/instances", ImmutableMap.of("command", "enable"), entity,
- Response.Status.OK.getStatusCode());
- clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
- Assert.assertEquals(clusterConfig.getDisabledInstances().keySet(),
- new HashSet<>(Arrays.asList(CLUSTER_NAME + "localhost_12919")));
- */
-
// Test enable disable partitions
String dbName = "_db_0_";
List<String> partitionsToDisable = Arrays.asList(CLUSTER_NAME + dbName + "0",
@@ -327,9 +220,9 @@ public class TestInstanceAccessor extends AbstractTestClass {
entity = Entity.entity(
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
- INSTANCE_NAME, InstanceAccessor.InstanceProperties.resource.name(),
+ INSTANCE_NAME, PerInstanceAccessor.PerInstanceProperties.resource.name(),
CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
- InstanceAccessor.InstanceProperties.partitions.name(), partitionsToDisable)),
+ PerInstanceAccessor.PerInstanceProperties.partitions.name(), partitionsToDisable)),
MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=disablePartitions")
@@ -342,9 +235,9 @@ public class TestInstanceAccessor extends AbstractTestClass {
new HashSet<>(partitionsToDisable));
entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(ImmutableMap
.of(AbstractResource.Properties.id.name(), INSTANCE_NAME,
- InstanceAccessor.InstanceProperties.resource.name(),
+ PerInstanceAccessor.PerInstanceProperties.resource.name(),
CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
- InstanceAccessor.InstanceProperties.partitions.name(),
+ PerInstanceAccessor.PerInstanceProperties.partitions.name(),
ImmutableList.of(CLUSTER_NAME + dbName + "1"))), MediaType.APPLICATION_JSON_TYPE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=enablePartitions")
@@ -494,32 +387,5 @@ public class TestInstanceAccessor extends AbstractTestClass {
Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP));
}
- private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) {
- _gSetupTool.addCluster(clusterName, true);
- ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
- clusterConfig.setFaultZoneType("helixZoneId");
- clusterConfig.setPersistIntermediateAssignment(true);
- _configAccessor.setClusterConfig(clusterName, clusterConfig);
- // Create instance configs
- List<InstanceConfig> instanceConfigs = new ArrayList<>();
- for (int i = 0; i < instances.size() - 1; i++) {
- InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
- instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i);
- instanceConfigs.add(instanceConfig);
- }
- instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1)));
- instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5");
-
- instanceConfigs.get(1).setInstanceEnabled(false);
- instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
- for (InstanceConfig instanceConfig : instanceConfigs) {
- _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
- }
-
- // Start participant
- startInstances(clusterName, new TreeSet<>(instances), 3);
- createResources(clusterName, 1);
- startController(clusterName);
- }
}