You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/12/15 21:41:45 UTC
[helix] branch master updated: Add take/free instance implementation and test (#1918)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new e481dee Add take/free instance implementation and test (#1918)
e481dee is described below
commit e481dee697d90fc60c3be27d63b57f99f591599d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Dec 15 13:41:36 2021 -0800
Add take/free instance implementation and test (#1918)
* take & free single instance impl
---
.../MaintenanceManagementInstanceInfo.java | 16 +-
.../MaintenanceManagementService.java | 274 ++++++++++++++++++---
.../rest/common/HelixDataAccessorWrapper.java | 4 +-
.../resources/helix/PerInstanceAccessor.java | 47 ++--
.../helix/rest/server/service/InstanceService.java | 4 +-
.../TestMaintenanceManagementService.java | 18 +-
.../helix/rest/server/TestOperationImpl.java | 148 +++++++++++
.../helix/rest/server/TestPerInstanceAccessor.java | 150 ++++++++++-
8 files changed, 602 insertions(+), 59 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
index 8914173..83b7fbc 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
@@ -68,18 +68,26 @@ public class MaintenanceManagementInstanceInfo {
operationResult = result;
}
- public void addFailureMessage(List<String> msg) {
+ public void addMessages(List<String> msg) {
messages.addAll(msg);
}
+ public void addMessage(String meg) {
+ messages.add(meg);
+ }
public boolean isSuccessful() {
return status.equals(OperationalStatus.SUCCESS);
}
- public void mergeResultStatus(MaintenanceManagementInstanceInfo info) {
+ public void mergeResult(MaintenanceManagementInstanceInfo info) {
+ mergeResult(info, false);
+ }
+
+ public void mergeResult(MaintenanceManagementInstanceInfo info, boolean nonBlockingFailure) {
messages.addAll(info.getMessages());
- status = info.isSuccessful() && isSuccessful() ? OperationalStatus.SUCCESS
- : OperationalStatus.FAILURE;
+ status =
+ (info.isSuccessful() || nonBlockingFailure) && isSuccessful() ? OperationalStatus.SUCCESS
+ : OperationalStatus.FAILURE;
if (info.hasOperationResult()) {
operationResult =
this.hasOperationResult() ? operationResult + "," + info.getOperationResult()
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 7785f9d..082c54b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -36,7 +38,7 @@ import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -51,10 +53,14 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.rest.client.CustomRestClient;
import org.apache.helix.rest.client.CustomRestClientFactory;
+import org.apache.helix.rest.clusterMaintenanceService.api.OperationInterface;
import org.apache.helix.rest.common.HelixDataAccessorWrapper;
+import org.apache.helix.rest.common.datamodel.RestSnapShot;
import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
import org.apache.helix.rest.server.service.InstanceService;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,35 +77,45 @@ public class MaintenanceManagementService {
MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_error_total");
private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION =
MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_duration");
+ public static final String ALL_HEALTH_CHECK_NONBLOCK = "allHealthCheckNonBlock";
private final ConfigAccessor _configAccessor;
private final CustomRestClient _customRestClient;
private final String _namespace;
private final boolean _skipZKRead;
- private final boolean _continueOnFailures;
private final HelixDataAccessorWrapper _dataAccessor;
+ private final Set<String> _nonBlockingHealthChecks;
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+ Collections.emptySet(), namespace);
+ }
- public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- boolean skipZKRead, String namespace) {
- this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, false, namespace);
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, boolean skipZKRead, Set<String> nonBlockingHealthChecks,
+ String namespace) {
+ this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+ nonBlockingHealthChecks, namespace);
}
- public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- boolean skipZKRead, boolean continueOnFailures, String namespace) {
+ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, boolean skipZKRead, boolean continueOnFailure,
+ String namespace) {
this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
- continueOnFailures, namespace);
+ continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+ : Collections.emptySet(), namespace);
}
@VisibleForTesting
MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
- CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures,
+ CustomRestClient customRestClient, boolean skipZKRead, Set<String> nonBlockingHealthChecks,
String namespace) {
_dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace);
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
- _continueOnFailures = continueOnFailures;
+ _nonBlockingHealthChecks = nonBlockingHealthChecks;
_namespace = namespace;
}
@@ -116,6 +132,8 @@ public class MaintenanceManagementService {
* @param healthChecks A list of healthChecks to perform
* @param healthCheckConfig The input for health Checks
* @param operations A list of operation checks or operations to execute
+ * @param operationConfig A map of config. Key is the operation name value if a Json
+ * representation of a map
* @param performOperation If this param is set to false, the function will only do a dry run
* @return MaintenanceManagementInstanceInfo
* @throws IOException in case of network failure
@@ -123,7 +141,17 @@ public class MaintenanceManagementService {
public MaintenanceManagementInstanceInfo takeInstance(String clusterId, String instanceName,
List<String> healthChecks, Map<String, String> healthCheckConfig, List<String> operations,
Map<String, String> operationConfig, boolean performOperation) throws IOException {
- return null;
+
+ if ((healthChecks == null || healthChecks.isEmpty()) && (operations == null || operations
+ .isEmpty())) {
+ MaintenanceManagementInstanceInfo result = new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE);
+ result.addMessage("Invalid input. Please provide at least one health check or operation.");
+ return result;
+ }
+
+ return takeFreeSingleInstanceHelper(clusterId, instanceName, healthChecks, healthCheckConfig,
+ operations, operationConfig, performOperation, true);
}
/**
@@ -139,6 +167,8 @@ public class MaintenanceManagementService {
* @param healthChecks A list of healthChecks to perform
* @param healthCheckConfig The input for health Checks
* @param operations A list of operation checks or operations to execute
+ * @param operationConfig A map of config. Key is the operation name value if a Json
+ * representation of a map.
* @param performOperation If this param is set to false, the function will only do a dry run
* @return A list of MaintenanceManagementInstanceInfo
* @throws IOException in case of network failure
@@ -163,6 +193,8 @@ public class MaintenanceManagementService {
* @param healthChecks A list of healthChecks to perform
* @param healthCheckConfig The input for health Checks
* @param operations A list of operation checks or operations to execute
+ * @param operationConfig A map of config. Key is the operation name value if a Json
+ * representation of a map
* @param performOperation If this param is set to false, the function will only do a dry run
* @return MaintenanceManagementInstanceInfo
* @throws IOException in case of network failure
@@ -170,7 +202,9 @@ public class MaintenanceManagementService {
public MaintenanceManagementInstanceInfo freeInstance(String clusterId, String instanceName,
List<String> healthChecks, Map<String, String> healthCheckConfig, List<String> operations,
Map<String, String> operationConfig, boolean performOperation) throws IOException {
- return null;
+
+ return takeFreeSingleInstanceHelper(clusterId, instanceName, healthChecks, healthCheckConfig,
+ operations, operationConfig, performOperation, false);
}
/**
@@ -186,6 +220,8 @@ public class MaintenanceManagementService {
* @param healthChecks A list of healthChecks to perform
* @param healthCheckConfig The input for health Checks
* @param operations A list of operation checks or operations to execute
+ * @param operationConfig A map of config. Key is the operation name value if a Json
+ * representation of a map
* @param performOperation If this param is set to false, the function will only do a dry run
* @return A list of MaintenanceManagementInstanceInfo
* @throws IOException in case of network failure
@@ -243,10 +279,27 @@ public class MaintenanceManagementService {
clusterId, instanceName, ex);
instanceInfoBuilder.healthStatus(false);
}
-
return instanceInfoBuilder.build();
}
+ private List<OperationInterface> getAllOperationClasses(List<String> operations) {
+ List<OperationInterface> operationAbstractClassList = new ArrayList<>();
+ for (String operationClassName : operations) {
+ try {
+ LOG.info("Loading class: " + operationClassName);
+ OperationInterface userOperation =
+ (OperationInterface) HelixUtil.loadClass(getClass(), operationClassName)
+ .newInstance();
+ operationAbstractClassList.add(userOperation);
+ } catch (Exception e) {
+ LOG.error("No operation class found for: {}. message: ", operationClassName, e);
+ throw new HelixException(
+ String.format("No operation class found for: %s. message: %s", operationClassName, e));
+ }
+ }
+ return operationAbstractClassList;
+ }
+
/**
* {@inheritDoc}
* Single instance stoppable check implementation is a special case of
@@ -269,29 +322,106 @@ public class MaintenanceManagementService {
public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
- // helix instance check
+ // helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
- // custom check
+ // custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
return finalStoppableChecks;
}
+ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String clusterId,
+ String instanceName, List<String> healthChecks, Map<String, String> healthCheckConfig,
+ List<String> operations, Map<String, String> operationConfig, boolean performOperation,
+ boolean isTakeInstance) {
+
+ if (operations == null) {
+ operations = new ArrayList<>();
+ }
+ if (healthChecks == null) {
+ healthChecks = new ArrayList<>();
+ }
+
+ try {
+ MaintenanceManagementInstanceInfo instanceInfo;
+ instanceInfo =
+ batchInstanceHealthCheck(clusterId, ImmutableList.of(instanceName), healthChecks,
+ healthCheckConfig).getOrDefault(instanceName, new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS));
+ if (!instanceInfo.isSuccessful()) {
+ return instanceInfo;
+ }
+
+ List<OperationInterface> operationAbstractClassList = getAllOperationClasses(operations);
+
+ _dataAccessor.populateCache(OperationInterface.PROPERTY_TYPE_LIST);
+ RestSnapShot sp = _dataAccessor.getRestSnapShot();
+ String continueOnFailuresName =
+ PerInstanceAccessor.PerInstanceProperties.continueOnFailures.name();
+ Map<String, Map<String, String>> operationConfigSet = new HashMap<>();
+
+ // perform operation check
+ for (OperationInterface operationClass : operationAbstractClassList) {
+ String operationClassName = operationClass.getClass().getName();
+ Map<String, String> singleOperationConfig =
+ (operationConfig == null || !operationConfig.containsKey(operationClassName))
+ ? Collections.emptyMap()
+ : getMapFromJsonPayload(operationConfig.get(operationClassName));
+ operationConfigSet.put(operationClassName, singleOperationConfig);
+ boolean continueOnFailures =
+ singleOperationConfig.containsKey(continueOnFailuresName) && getBooleanFromJsonPayload(
+ singleOperationConfig.get(continueOnFailuresName));
+
+ MaintenanceManagementInstanceInfo checkResult = isTakeInstance ? operationClass
+ .operationCheckForTakeSingleInstance(instanceName, singleOperationConfig, sp)
+ : operationClass
+ .operationCheckForFreeSingleInstance(instanceName, singleOperationConfig, sp);
+ instanceInfo.mergeResult(checkResult, continueOnFailures);
+ }
+
+ // operation execution
+ if (performOperation && instanceInfo.isSuccessful()) {
+ for (OperationInterface operationClass : operationAbstractClassList) {
+ Map<String, String> singleOperationConfig =
+ operationConfigSet.get(operationClass.getClass().getName());
+ boolean continueOnFailures =
+ singleOperationConfig.containsKey(continueOnFailuresName) && Boolean
+ .parseBoolean(singleOperationConfig.get(continueOnFailuresName));
+ MaintenanceManagementInstanceInfo newResult = isTakeInstance ? operationClass
+ .operationExecForTakeSingleInstance(instanceName, singleOperationConfig, sp)
+ : operationClass
+ .operationExecForFreeSingleInstance(instanceName, singleOperationConfig, sp);
+ instanceInfo.mergeResult(newResult, continueOnFailures);
+ if (!instanceInfo.isSuccessful()) {
+ LOG.warn("Operation failed for {}, skip all following operations.",
+ operationClass.getClass().getName());
+ break;
+ }
+ }
+ }
+ return instanceInfo;
+ } catch (Exception ex) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+ Collections.singletonList(ex.getMessage()));
+ }
+ }
+
private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
- // finalStoppableChecks only contains instances that does not pass this health check
+ // finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
- private void batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
+ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
Map<String, StoppableCheck> finalStoppableChecks, Map<String, String> customPayLoads) {
- if (instances.isEmpty() ) {
+ if (instances.isEmpty()) {
// if all instances failed at previous checks, then all following checks are not required.
- return;
+ return instances;
}
RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
if (restConfig == null) {
@@ -308,16 +438,65 @@ public class MaintenanceManagementService {
List<String> instancesForCustomPartitionLevelChecks =
filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
+ // add to finalStoppableChecks regardless of stoppable or not.
Map<String, StoppableCheck> instancePartitionLevelChecks =
performPartitionsCheck(instancesForCustomPartitionLevelChecks, restConfig,
customPayLoads);
+ List<String> instancesForFollowingChecks = new ArrayList<>();
for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
.entrySet()) {
String instance = instancePartitionStoppableCheckEntry.getKey();
StoppableCheck stoppableCheck = instancePartitionStoppableCheckEntry.getValue();
addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
+ if (stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck)) {
+ // instance passed this around of check or mandatory all checks
+ // will be checked in the next round
+ instancesForFollowingChecks.add(instance);
+ }
}
+ return instancesForFollowingChecks;
}
+ return instancesForCustomPartitionLevelChecks;
+ }
+
+ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(String clusterId,
+ List<String> instances, List<String> healthChecks, Map<String, String> healthCheckConfig) {
+ List<String> instancesForNext = new ArrayList<>(instances);
+ Map<String, MaintenanceManagementInstanceInfo> instanceInfos = new HashMap<>();
+ Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
+ // TODO: Right now user can only choose from HelixInstanceStoppableCheck and
+ // CostumeInstanceStoppableCheck. We should add finer grain check groups to choose from
+ // i.e. HELIX:INSTANCE_NOT_ENABLED, CUSTOM_PARTITION_HEALTH_FAILURE:PARTITION_INITIAL_STATE_FAIL etc.
+ for (String healthCheck : healthChecks) {
+ if (healthCheck.equals("HelixInstanceStoppableCheck")) {
+ // this is helix own check
+ instancesForNext =
+ batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
+ } else if (healthCheck.equals("CustomInstanceStoppableCheck")) {
+ // custom check, includes custom Instance check and partition check.
+ instancesForNext = batchCustomInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
+ healthCheckConfig);
+ } else {
+ throw new UnsupportedOperationException(healthCheck + " is not supported yet!");
+ }
+ }
+ // assemble result. Returned map contains all instances with pass or fail status.
+ Set<String> clearedInstance = new HashSet<>(instancesForNext);
+ for (String instance : instances) {
+ MaintenanceManagementInstanceInfo result = new MaintenanceManagementInstanceInfo(
+ clearedInstance.contains(instance)
+ ? MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS
+ : MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE);
+ if (finalStoppableChecks.containsKey(instance) && !finalStoppableChecks.get(instance)
+ .isStoppable()) {
+ // If an non blocking check failed, the we will have a stoppbale check object with
+ // stoppbale = false and the instance is in clearedInstance. We will sign Success state and
+ // a error message.
+ result.addMessages(finalStoppableChecks.get(instance).getFailedChecks());
+ }
+ instanceInfos.put(instance, result);
+ }
+ return instanceInfos;
}
private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, String instance,
@@ -343,7 +522,7 @@ public class MaintenanceManagementService {
// put the check result of the failed-to-stop instances
addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
}
- if (stoppableCheck.isStoppable() || _continueOnFailures){
+ if (stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck)) {
// instance passed this around of check or mandatory all checks
// will be checked in the next round
instancesForNextCheck.add(instance);
@@ -356,10 +535,34 @@ public class MaintenanceManagementService {
return instancesForNextCheck;
}
+ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
+ if (_nonBlockingHealthChecks.isEmpty()) {
+ return false;
+ }
+ if (_nonBlockingHealthChecks.contains(ALL_HEALTH_CHECK_NONBLOCK)) {
+ return true;
+ }
+ for (String failedCheck : stoppableCheck.getFailedChecks()) {
+ if (failedCheck.startsWith("CUSTOM_")) {
+ // failed custom check will have the pattern
+ // "CUSTOM_PARTITION_HEALTH_FAILURE:PARTITION_INITIAL_STATE_FAIL:partition_name"
+ // we want to keep the first 2 parts as failed test name.
+ String[] checks = failedCheck.split(":", 3);
+ failedCheck = checks[0] + checks[1];
+ }
+ // Helix own health check name wil be in this pattern "HELIX:INSTANCE_NOT_ALIVE",
+ // no need to preprocess.
+ if (!_nonBlockingHealthChecks.contains(failedCheck)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
- Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
- HealthCheck.STOPPABLE_CHECK_LIST);
+ Map<String, Boolean> helixStoppableCheck =
+ getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
@@ -418,7 +621,18 @@ public class MaintenanceManagementService {
}
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
// parsing the inputs as string key value pairs
- jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asText()));
+ jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
+ kv.getValue().isValueNode() ? kv.getValue().asText() : kv.getValue().toString()));
+ return result;
+ }
+
+ public static Map<String, String> getMapFromJsonPayload(JsonNode jsonNode)
+ throws IllegalArgumentException {
+ Map<String, String> result = new HashMap<>();
+ if (jsonNode != null) {
+ jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
+ kv.getValue().isValueNode() ? kv.getValue().asText() : kv.getValue().toString()));
+ }
return result;
}
@@ -428,11 +642,15 @@ public class MaintenanceManagementService {
: OBJECT_MAPPER.convertValue(jsonContent, List.class);
}
- public static Map<String, String> getMapFromJsonPayload(JsonNode jsonContent)
- throws IllegalArgumentException {
- return jsonContent == null ? new HashMap<>()
- : OBJECT_MAPPER.convertValue(jsonContent, new TypeReference<Map<String, String>>() {
- });
+ public static List<String> getListFromJsonPayload(String jsonString)
+ throws IllegalArgumentException, JsonProcessingException {
+ return (jsonString == null) ? Collections.emptyList()
+ : OBJECT_MAPPER.readValue(jsonString, List.class);
+ }
+
+ public static boolean getBooleanFromJsonPayload(String jsonString)
+ throws IllegalArgumentException, JsonProcessingException {
+ return OBJECT_MAPPER.readTree(jsonString).asBoolean();
}
@VisibleForTesting
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
index e47fd03..a401fa1 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
@@ -39,7 +39,7 @@ import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.rest.client.CustomRestClient;
import org.apache.helix.rest.client.CustomRestClientFactory;
@@ -241,7 +241,7 @@ public class HelixDataAccessorWrapper extends ZKHelixDataAccessor {
for (String resourceName : resources) {
getProperty(propertyKeyBuilder.idealStates(resourceName));
- IdealState externalView = getProperty(propertyKeyBuilder.externalView(resourceName));
+ ExternalView externalView = getProperty(propertyKeyBuilder.externalView(resourceName));
if (externalView != null) {
String stateModeDef = externalView.getStateModelDefRef();
getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 92cd045..3fd284e 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -21,8 +21,10 @@ package org.apache.helix.rest.server.resources.helix;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -88,7 +90,8 @@ public class PerInstanceAccessor extends AbstractHelixResource {
operation_list,
operation_config,
continueOnFailures,
- skipZKRead
+ skipZKRead,
+ performOperation
}
private static class MaintenanceOpInputFields {
@@ -96,8 +99,9 @@ public class PerInstanceAccessor extends AbstractHelixResource {
Map<String, String> healthCheckConfig = null;
List<String> operations = null;
Map<String, String> operationConfig = null;
- boolean continueOnFailures = false;
+ Set<String> nonBlockingHelixCheck = new HashSet<>();
boolean skipZKRead = false;
+ boolean performOperation = true;
}
@ResponseMetered(name = HttpConstants.READ_REQUEST)
@@ -227,14 +231,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
MaintenanceManagementService maintenanceManagementService =
new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
- getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+ getConfigAccessor(), inputFields.skipZKRead, inputFields.nonBlockingHelixCheck,
getNamespace());
return JSONRepresentation(maintenanceManagementService
.takeInstance(clusterId, instanceName, inputFields.healthChecks,
inputFields.healthCheckConfig,
inputFields.operations,
- inputFields.operationConfig, true));
+ inputFields.operationConfig, inputFields.performOperation));
} catch (Exception e) {
LOG.error("Failed to takeInstances:", e);
return badRequest("Failed to takeInstances: " + e.getMessage());
@@ -271,14 +275,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
MaintenanceManagementService maintenanceManagementService =
new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
- getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+ getConfigAccessor(), inputFields.skipZKRead, inputFields.nonBlockingHelixCheck,
getNamespace());
return JSONRepresentation(maintenanceManagementService
.freeInstance(clusterId, instanceName, inputFields.healthChecks,
inputFields.healthCheckConfig,
inputFields.operations,
- inputFields.operationConfig, true));
+ inputFields.operationConfig, inputFields.performOperation));
} catch (Exception e) {
LOG.error("Failed to takeInstances:", e);
return badRequest("Failed to takeInstances: " + e.getMessage());
@@ -296,28 +300,37 @@ public class PerInstanceAccessor extends AbstractHelixResource {
MaintenanceOpInputFields inputFields = new MaintenanceOpInputFields();
String continueOnFailuresName = PerInstanceProperties.continueOnFailures.name();
String skipZKReadName = PerInstanceProperties.skipZKRead.name();
-
- inputFields.continueOnFailures =
- inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
- .containsKey(continueOnFailuresName) && Boolean
- .parseBoolean(inputFields.healthCheckConfig.get(continueOnFailuresName));
- inputFields.skipZKRead = inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
- .containsKey(skipZKReadName) && Boolean
- .parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+ String performOperation = PerInstanceProperties.performOperation.name();
inputFields.healthChecks = MaintenanceManagementService
.getListFromJsonPayload(node.get(PerInstanceProperties.health_check_list.name()));
inputFields.healthCheckConfig = MaintenanceManagementService
.getMapFromJsonPayload(node.get(PerInstanceProperties.health_check_config.name()));
- if (inputFields.healthCheckConfig != null || !inputFields.healthChecks.isEmpty()) {
- inputFields.healthCheckConfig.remove(continueOnFailuresName);
- inputFields.healthCheckConfig.remove(skipZKReadName);
+
+ if (inputFields.healthCheckConfig != null) {
+ if (inputFields.healthCheckConfig.containsKey(continueOnFailuresName)) {
+ inputFields.nonBlockingHelixCheck = new HashSet<String>(MaintenanceManagementService
+ .getListFromJsonPayload(inputFields.healthCheckConfig.get(continueOnFailuresName)));
+ // healthCheckConfig will be passed to customer's health check directly, we need to
+ // remove unrelated kc paris.
+ inputFields.healthCheckConfig.remove(continueOnFailuresName);
+ }
+ if (inputFields.healthCheckConfig.containsKey(skipZKReadName)) {
+ inputFields.skipZKRead =
+ Boolean.parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+ inputFields.healthCheckConfig.remove(skipZKReadName);
+ }
}
inputFields.operations = MaintenanceManagementService
.getListFromJsonPayload(node.get(PerInstanceProperties.operation_list.name()));
inputFields.operationConfig = MaintenanceManagementService
.getMapFromJsonPayload(node.get(PerInstanceProperties.operation_config.name()));
+ if (inputFields.operationConfig != null && inputFields.operationConfig
+ .containsKey(performOperation)) {
+ inputFields.performOperation =
+ Boolean.parseBoolean(inputFields.operationConfig.get(performOperation));
+ }
LOG.debug("Input fields for take/free Instance" + inputFields.toString());
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index ae11aec..1d4376b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -28,8 +28,6 @@ import org.apache.helix.rest.server.json.instance.InstanceInfo;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
public interface InstanceService {
-
-
/**
* Get the overall status of the instance
*
@@ -38,7 +36,7 @@ public interface InstanceService {
* @return An instance of {@link InstanceInfo} easily convertible to JSON
*/
InstanceInfo getInstanceInfo(String clusterId, String instanceName,
- List<HealthCheck> healthChecks);
+ List<HealthCheck> healthChecks);
/**
* Get the current instance stoppable checks
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 770aeb5..b24ffa1 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -18,12 +18,15 @@ package org.apache.helix.rest.clusterMaintenanceService;
* specific language governing permissions and limitations
* under the License.
*/
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
@@ -46,6 +49,7 @@ import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
@@ -77,12 +81,22 @@ public class TestMaintenanceManagementService {
}
class MockMaintenanceManagementService extends MaintenanceManagementService {
+
public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
- boolean continueOnFailures, String namespace) {
- super(dataAccessor, configAccessor, customRestClient, skipZKRead, continueOnFailures,
+ boolean continueOnFailure, String namespace) {
+ super(dataAccessor, configAccessor, customRestClient, skipZKRead,
+ continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+ : Collections.emptySet(), namespace);
+ }
+
+ public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+ ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
+ Set<String> nonBlockingHealthChecks, String namespace) {
+ super(dataAccessor, configAccessor, customRestClient, skipZKRead, nonBlockingHealthChecks,
namespace);
}
+
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java
new file mode 100644
index 0000000..997b1aa
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java
@@ -0,0 +1,148 @@
+package org.apache.helix.rest.server;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementInstanceInfo;
+import org.apache.helix.rest.clusterMaintenanceService.api.OperationInterface;
+import org.apache.helix.rest.common.RestSnapShotSimpleImpl;
+import org.apache.helix.rest.common.datamodel.RestSnapShot;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.InstanceValidationUtil;
+
+
+public class TestOperationImpl implements OperationInterface {
+
+ @Override
+ public MaintenanceManagementInstanceInfo operationCheckForTakeSingleInstance(String instanceName, Map<String, String> operationConfig, RestSnapShot sn) {
+ Map<String, Boolean> isInstanceOnHoldCache = new HashMap<>();
+ for (Map.Entry<String, String> entry : operationConfig.entrySet()) {
+ isInstanceOnHoldCache.put(entry.getKey(), Boolean.parseBoolean(entry.getValue()));
+ }
+ try {
+ String unHealthyPartition =
+ siblingNodesActiveReplicaCheck(sn, instanceName, isInstanceOnHoldCache);
+ if (unHealthyPartition == null) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS);
+ } else {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+ Collections.singletonList(unHealthyPartition));
+ }
+ } catch (Exception ex) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+ Collections.singletonList(ex.getMessage()));
+ }
+ }
+
+ @Override
+ public MaintenanceManagementInstanceInfo operationCheckForFreeSingleInstance(String instanceName, Map<String, String> operationConfig, RestSnapShot sn) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS);
+ }
+
+ @Override
+ public Map<String, MaintenanceManagementInstanceInfo> operationCheckForTakeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+ return null;
+ }
+
+ @Override
+ public Map<String, MaintenanceManagementInstanceInfo> operationCheckForFreeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+ return null;
+ }
+
+ @Override
+ public MaintenanceManagementInstanceInfo operationExecForTakeSingleInstance(String instanceName,
+ Map<String, String> operationConfig, RestSnapShot sn) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS,
+ "DummyTakeOperationResult");
+ }
+
+ @Override
+ public MaintenanceManagementInstanceInfo operationExecForFreeSingleInstance(String instanceName,
+ Map<String, String> operationConfig, RestSnapShot sn) {
+ return new MaintenanceManagementInstanceInfo(
+ MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS,
+ "DummyFreeOperationResult");
+ }
+
+ @Override
+ public Map<String, MaintenanceManagementInstanceInfo> operationExecForTakeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+ return null;
+ }
+
+ @Override
+ public Map<String, MaintenanceManagementInstanceInfo> operationExecForFreeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+ return null;
+ }
+
+ public String siblingNodesActiveReplicaCheck(RestSnapShot snapShot, String instanceName,
+ Map<String, Boolean> isInstanceOnHoldCache) throws HelixException {
+ String clusterName = snapShot.getClusterName();
+ if (!(snapShot instanceof RestSnapShotSimpleImpl)) {
+ throw new HelixException("Passed in Snapshot is not an instance of RestSnapShotSimpleImpl");
+ } RestSnapShotSimpleImpl restSnapShotSimple = (RestSnapShotSimpleImpl) snapShot;
+
+ PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(clusterName);
+ List<String> resources = restSnapShotSimple.getChildNames(propertyKeyBuilder.idealStates());
+
+ for (String resourceName : resources) {
+ IdealState idealState =
+ restSnapShotSimple.getProperty(propertyKeyBuilder.idealStates(resourceName));
+ if (idealState == null || !idealState.isEnabled() || !idealState.isValid()
+ || TaskConstants.STATE_MODEL_NAME.equals(idealState.getStateModelDefRef())) {
+ continue;
+ }
+ ExternalView externalView =
+ restSnapShotSimple.getProperty(propertyKeyBuilder.externalView(resourceName));
+ if (externalView == null) {
+ throw new HelixException(
+ String.format("Resource %s does not have external view!", resourceName));
+ }
+ // Get the minActiveReplicas constraint for the resource
+ int minActiveReplicas = externalView.getMinActiveReplicas();
+ if (minActiveReplicas == -1) {
+ continue;
+ }
+ String stateModeDef = externalView.getStateModelDefRef();
+ StateModelDefinition stateModelDefinition =
+ restSnapShotSimple.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
+ Set<String> unhealthyStates = new HashSet<>(InstanceValidationUtil.UNHEALTHY_STATES);
+ if (stateModelDefinition != null) {
+ unhealthyStates.add(stateModelDefinition.getInitialState());
+ }
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateByInstanceMap = externalView.getStateMap(partition);
+ // found the resource hosted on the instance
+ if (stateByInstanceMap.containsKey(instanceName)) {
+ int numHealthySiblings = 0;
+ for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
+ if (!entry.getKey().equals(instanceName) && !unhealthyStates.contains(entry.getValue())
+ && !isInstanceOnHoldCache.get(entry.getKey())) {
+ numHealthySiblings++;
+ }
+ }
+ if (numHealthySiblings < minActiveReplicas) {
+ return partition;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+}
+
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 58bccbf..6ca7bf6 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -30,7 +30,6 @@ import java.util.Set;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -64,7 +63,6 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
.format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
String stoppableCheckResult = response.readEntity(String.class);
-
Map<String, Object> actualMap = OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
List<String> failedChecks = Arrays
.asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
@@ -78,7 +76,6 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
@Test(dependsOnMethods = "testIsInstanceStoppable")
public void testTakeInstanceNegInput() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
-
post("clusters/TestCluster_0/instances/instance1/takeInstance", null,
Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
Response.Status.BAD_REQUEST.getStatusCode(), true);
@@ -86,6 +83,153 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
}
@Test(dependsOnMethods = "testTakeInstanceNegInput")
+ public void testTakeInstanceNegInput2() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ List<String> errorMsg = Arrays.asList("Invalid input. Please provide at least one health check or operation.");
+ Map<String, Object> expectedMap =
+ ImmutableMap.of("successful", false, "messages", errorMsg, "operationResult", "");
+ Assert.assertEquals(actualMap, expectedMap);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceNegInput2")
+ public void testTakeInstanceHealthCheck() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload = "{ \"health_check_list\" : [\"HelixInstanceStoppableCheck\", \"CustomInstanceStoppableCheck\"],"
+ + "\"health_check_config\" : { \"client\" : \"espresso\" }} ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ List<String> errorMsg = Arrays
+ .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+ "HELIX:INSTANCE_NOT_STABLE");
+ Map<String, Object> expectedMap =
+ ImmutableMap.of("successful", false, "messages", errorMsg, "operationResult", "");
+ Assert.assertEquals(actualMap, expectedMap);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceNegInput2")
+ public void testTakeInstanceNonBlockingCheck() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload = "{ \"health_check_list\" : [\"HelixInstanceStoppableCheck\"],"
+ + "\"health_check_config\" : { \"client\" : \"espresso\" , "
+ + "\"continueOnFailures\" : [\"HELIX:EMPTY_RESOURCE_ASSIGNMENT\", \"HELIX:INSTANCE_NOT_ENABLED\","
+ + " \"HELIX:INSTANCE_NOT_STABLE\"]} } ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ List<String> errorMsg = Arrays
+ .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+ "HELIX:INSTANCE_NOT_STABLE");
+ Map<String, Object> expectedMap =
+ ImmutableMap.of("successful", true, "messages", errorMsg, "operationResult", "");
+ Assert.assertEquals(actualMap, expectedMap);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceHealthCheck")
+ public void testTakeInstanceOperationSuccess() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload =
+ "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"]} ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1")
+ .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ Map<String, Object> expectedMap = ImmutableMap
+ .of("successful", true, "messages", new ArrayList<>(), "operationResult", "DummyTakeOperationResult");
+ Assert.assertEquals(actualMap, expectedMap);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceOperationSuccess")
+ public void testFreeInstanceOperationSuccess() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload =
+ "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"]} ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/freeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1")
+ .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ Map<String, Object> expectedMap = ImmutableMap
+ .of("successful", true, "messages", new ArrayList<>(), "operationResult",
+ "DummyFreeOperationResult");
+ Assert.assertEquals(actualMap, expectedMap);
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testFreeInstanceOperationSuccess")
+ public void testTakeInstanceOperationCheckFailure() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+ + "\"operation_config\": { \"org.apache.helix.rest.server.TestOperationImpl\" :"
+ + " {\"instance0\": true, \"instance2\": true, "
+ + "\"instance3\": true, \"instance4\": true, \"instance5\": true, "
+ + " \"value\" : \"i001\", \"list_value\" : [\"list1\"]}} } ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance0")
+ .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ Assert.assertFalse((boolean)actualMap.get("successful"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceOperationCheckFailure")
+ public void testTakeInstanceOperationCheckFailureNonBlocking() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+ + "\"operation_config\": { \"org.apache.helix.rest.server.TestOperationImpl\" : "
+ + "{\"instance0\": true, \"instance2\": true, "
+ + "\"instance3\": true, \"instance4\": true, \"instance5\": true, "
+ + "\"continueOnFailures\" : true} } } ";
+
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance0")
+ .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+ System.out.println("testTakeInstanceOperationCheckFailureNonBlocking" + takeInstanceResult);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ Assert.assertTrue((boolean)actualMap.get("successful"));
+ Assert.assertEquals(actualMap.get("operationResult"), "DummyTakeOperationResult");
+ // The non blocking test should generate msg but won't return failure status
+ Assert.assertFalse(actualMap.get("messages").equals("[]"));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceOperationCheckFailureNonBlocking")
+ public void testTakeInstanceCheckOnly() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+ + "\"operation_config\": {\"performOperation\": false} } ";
+ Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+ .format(STOPPABLE_CLUSTER, "instance1")
+ .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+ String takeInstanceResult = response.readEntity(String.class);
+
+ Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+ Assert.assertTrue((boolean)actualMap.get("successful"));
+ Assert.assertTrue(actualMap.get("operationResult").equals(""));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testTakeInstanceCheckOnly")
public void testGetAllMessages() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
String testInstance = CLUSTER_NAME + "localhost_12926"; //Non-live instance