You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/12/15 21:41:45 UTC

[helix] branch master updated: Add take/free instance implementation and test (#1918)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new e481dee  Add take/free instance implementation and test (#1918)
e481dee is described below

commit e481dee697d90fc60c3be27d63b57f99f591599d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Dec 15 13:41:36 2021 -0800

    Add take/free instance implementation and test (#1918)
    
    * take & free single instance impl
---
 .../MaintenanceManagementInstanceInfo.java         |  16 +-
 .../MaintenanceManagementService.java              | 274 ++++++++++++++++++---
 .../rest/common/HelixDataAccessorWrapper.java      |   4 +-
 .../resources/helix/PerInstanceAccessor.java       |  47 ++--
 .../helix/rest/server/service/InstanceService.java |   4 +-
 .../TestMaintenanceManagementService.java          |  18 +-
 .../helix/rest/server/TestOperationImpl.java       | 148 +++++++++++
 .../helix/rest/server/TestPerInstanceAccessor.java | 150 ++++++++++-
 8 files changed, 602 insertions(+), 59 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
index 8914173..83b7fbc 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementInstanceInfo.java
@@ -68,18 +68,26 @@ public class MaintenanceManagementInstanceInfo {
     operationResult = result;
   }
 
-  public void addFailureMessage(List<String> msg) {
+  public void addMessages(List<String> msg) {
     messages.addAll(msg);
   }
+  public void addMessage(String meg) {
+    messages.add(meg);
+  }
 
   public boolean isSuccessful() {
     return status.equals(OperationalStatus.SUCCESS);
   }
 
-  public void mergeResultStatus(MaintenanceManagementInstanceInfo info) {
+  public void mergeResult(MaintenanceManagementInstanceInfo info) {
+    mergeResult(info, false);
+  }
+
+  public void mergeResult(MaintenanceManagementInstanceInfo info, boolean nonBlockingFailure) {
     messages.addAll(info.getMessages());
-    status = info.isSuccessful() && isSuccessful() ? OperationalStatus.SUCCESS
-        : OperationalStatus.FAILURE;
+    status =
+        (info.isSuccessful() || nonBlockingFailure) && isSuccessful() ? OperationalStatus.SUCCESS
+            : OperationalStatus.FAILURE;
     if (info.hasOperationResult()) {
       operationResult =
           this.hasOperationResult() ? operationResult + "," + info.getOperationResult()
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 7785f9d..082c54b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,7 +38,7 @@ import java.util.stream.Collectors;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.SharedMetricRegistries;
 import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -51,10 +53,14 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.client.CustomRestClient;
 import org.apache.helix.rest.client.CustomRestClientFactory;
+import org.apache.helix.rest.clusterMaintenanceService.api.OperationInterface;
 import org.apache.helix.rest.common.HelixDataAccessorWrapper;
+import org.apache.helix.rest.common.datamodel.RestSnapShot;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
 import org.apache.helix.rest.server.service.InstanceService;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.InstanceValidationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,35 +77,45 @@ public class MaintenanceManagementService {
       MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_error_total");
   private static final String CUSTOM_INSTANCE_CHECK_HTTP_REQUESTS_DURATION =
       MetricRegistry.name(InstanceService.class, "custom_instance_check_http_requests_duration");
+  public static final String ALL_HEALTH_CHECK_NONBLOCK = "allHealthCheckNonBlock";
 
   private final ConfigAccessor _configAccessor;
   private final CustomRestClient _customRestClient;
   private final String _namespace;
   private final boolean _skipZKRead;
-  private final boolean _continueOnFailures;
   private final HelixDataAccessorWrapper _dataAccessor;
+  private final Set<String> _nonBlockingHealthChecks;
 
+  public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+      ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
+    this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+        Collections.emptySet(), namespace);
+  }
 
-  public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
-      boolean skipZKRead, String namespace) {
-    this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead, false, namespace);
+  public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+      ConfigAccessor configAccessor, boolean skipZKRead, Set<String> nonBlockingHealthChecks,
+      String namespace) {
+    this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
+        nonBlockingHealthChecks, namespace);
   }
 
-  public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
-      boolean skipZKRead, boolean continueOnFailures, String namespace) {
+  public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+      ConfigAccessor configAccessor, boolean skipZKRead, boolean continueOnFailure,
+      String namespace) {
     this(dataAccessor, configAccessor, CustomRestClientFactory.get(), skipZKRead,
-        continueOnFailures, namespace);
+        continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+            : Collections.emptySet(), namespace);
   }
 
   @VisibleForTesting
   MaintenanceManagementService(ZKHelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
-      CustomRestClient customRestClient, boolean skipZKRead, boolean continueOnFailures,
+      CustomRestClient customRestClient, boolean skipZKRead, Set<String> nonBlockingHealthChecks,
       String namespace) {
     _dataAccessor = new HelixDataAccessorWrapper(dataAccessor, customRestClient, namespace);
     _configAccessor = configAccessor;
     _customRestClient = customRestClient;
     _skipZKRead = skipZKRead;
-    _continueOnFailures = continueOnFailures;
+    _nonBlockingHealthChecks = nonBlockingHealthChecks;
     _namespace = namespace;
   }
 
@@ -116,6 +132,8 @@ public class MaintenanceManagementService {
    * @param healthChecks       A list of healthChecks to perform
    * @param healthCheckConfig The input for health Checks
    * @param operations         A list of operation checks or operations to execute
+   * @param operationConfig    A map of config. Key is the operation name value if a Json
+   *                            representation of a map
    * @param performOperation   If this param is set to false, the function will only do a dry run
    * @return MaintenanceManagementInstanceInfo
    * @throws IOException in case of network failure
@@ -123,7 +141,17 @@ public class MaintenanceManagementService {
   public MaintenanceManagementInstanceInfo takeInstance(String clusterId, String instanceName,
       List<String> healthChecks, Map<String, String> healthCheckConfig, List<String> operations,
       Map<String, String> operationConfig, boolean performOperation) throws IOException {
-    return null;
+
+    if ((healthChecks == null || healthChecks.isEmpty()) && (operations == null || operations
+        .isEmpty())) {
+      MaintenanceManagementInstanceInfo result = new MaintenanceManagementInstanceInfo(
+          MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE);
+      result.addMessage("Invalid input. Please provide at least one health check or operation.");
+      return result;
+    }
+
+    return takeFreeSingleInstanceHelper(clusterId, instanceName, healthChecks, healthCheckConfig,
+        operations, operationConfig, performOperation, true);
   }
 
   /**
@@ -139,6 +167,8 @@ public class MaintenanceManagementService {
    * @param healthChecks       A list of healthChecks to perform
    * @param healthCheckConfig The input for health Checks
    * @param operations         A list of operation checks or operations to execute
+   * @param operationConfig    A map of config. Key is the operation name value if a Json
+   *                           representation of a map.
    * @param performOperation   If this param is set to false, the function will only do a dry run
    * @return A list of MaintenanceManagementInstanceInfo
    * @throws IOException in case of network failure
@@ -163,6 +193,8 @@ public class MaintenanceManagementService {
    * @param healthChecks       A list of healthChecks to perform
    * @param healthCheckConfig The input for health Checks
    * @param operations         A list of operation checks or operations to execute
+   * @param operationConfig    A map of config. Key is the operation name value if a Json
+   *                           representation of a map
    * @param performOperation   If this param is set to false, the function will only do a dry run
    * @return MaintenanceManagementInstanceInfo
    * @throws IOException in case of network failure
@@ -170,7 +202,9 @@ public class MaintenanceManagementService {
   public MaintenanceManagementInstanceInfo freeInstance(String clusterId, String instanceName,
       List<String> healthChecks, Map<String, String> healthCheckConfig, List<String> operations,
       Map<String, String> operationConfig, boolean performOperation) throws IOException {
-    return null;
+
+    return takeFreeSingleInstanceHelper(clusterId, instanceName, healthChecks, healthCheckConfig,
+        operations, operationConfig, performOperation, false);
   }
 
   /**
@@ -186,6 +220,8 @@ public class MaintenanceManagementService {
    * @param healthChecks       A list of healthChecks to perform
    * @param healthCheckConfig The input for health Checks
    * @param operations         A list of operation checks or operations to execute
+   * @param operationConfig    A map of config. Key is the operation name value if a Json
+   *                           representation of a map
    * @param performOperation   If this param is set to false, the function will only do a dry run
    * @return A list of MaintenanceManagementInstanceInfo
    * @throws IOException in case of network failure
@@ -243,10 +279,27 @@ public class MaintenanceManagementService {
           clusterId, instanceName, ex);
       instanceInfoBuilder.healthStatus(false);
     }
-
     return instanceInfoBuilder.build();
   }
 
+  private List<OperationInterface> getAllOperationClasses(List<String> operations) {
+    List<OperationInterface> operationAbstractClassList = new ArrayList<>();
+    for (String operationClassName : operations) {
+      try {
+        LOG.info("Loading class: " + operationClassName);
+        OperationInterface userOperation =
+            (OperationInterface) HelixUtil.loadClass(getClass(), operationClassName)
+                .newInstance();
+        operationAbstractClassList.add(userOperation);
+      } catch (Exception e) {
+        LOG.error("No operation class found for: {}. message: ", operationClassName, e);
+        throw new HelixException(
+            String.format("No operation class found for: %s. message: %s", operationClassName, e));
+      }
+    }
+    return operationAbstractClassList;
+  }
+
   /**
    * {@inheritDoc}
    * Single instance stoppable check implementation is a special case of
@@ -269,29 +322,106 @@ public class MaintenanceManagementService {
   public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
       List<String> instances, String jsonContent) throws IOException {
     Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
-    // helix instance check
+    // helix instance check.
     List<String> instancesForCustomInstanceLevelChecks =
         batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
-    // custom check
+    // custom check, includes partition check.
     batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
         finalStoppableChecks, getMapFromJsonPayload(jsonContent));
     return finalStoppableChecks;
   }
 
+  private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String clusterId,
+      String instanceName, List<String> healthChecks, Map<String, String> healthCheckConfig,
+      List<String> operations, Map<String, String> operationConfig, boolean performOperation,
+      boolean isTakeInstance) {
+
+    if (operations == null) {
+      operations = new ArrayList<>();
+    }
+    if (healthChecks == null) {
+      healthChecks = new ArrayList<>();
+    }
+
+    try {
+      MaintenanceManagementInstanceInfo instanceInfo;
+      instanceInfo =
+          batchInstanceHealthCheck(clusterId, ImmutableList.of(instanceName), healthChecks,
+              healthCheckConfig).getOrDefault(instanceName, new MaintenanceManagementInstanceInfo(
+              MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS));
+      if (!instanceInfo.isSuccessful()) {
+        return instanceInfo;
+      }
+
+      List<OperationInterface> operationAbstractClassList = getAllOperationClasses(operations);
+
+      _dataAccessor.populateCache(OperationInterface.PROPERTY_TYPE_LIST);
+      RestSnapShot sp = _dataAccessor.getRestSnapShot();
+      String continueOnFailuresName =
+          PerInstanceAccessor.PerInstanceProperties.continueOnFailures.name();
+      Map<String, Map<String, String>> operationConfigSet = new HashMap<>();
+
+      // perform operation check
+      for (OperationInterface operationClass : operationAbstractClassList) {
+        String operationClassName = operationClass.getClass().getName();
+        Map<String, String> singleOperationConfig =
+            (operationConfig == null || !operationConfig.containsKey(operationClassName))
+                ? Collections.emptyMap()
+                : getMapFromJsonPayload(operationConfig.get(operationClassName));
+        operationConfigSet.put(operationClassName, singleOperationConfig);
+        boolean continueOnFailures =
+            singleOperationConfig.containsKey(continueOnFailuresName) && getBooleanFromJsonPayload(
+                singleOperationConfig.get(continueOnFailuresName));
+
+        MaintenanceManagementInstanceInfo checkResult = isTakeInstance ? operationClass
+            .operationCheckForTakeSingleInstance(instanceName, singleOperationConfig, sp)
+            : operationClass
+                .operationCheckForFreeSingleInstance(instanceName, singleOperationConfig, sp);
+        instanceInfo.mergeResult(checkResult, continueOnFailures);
+      }
+
+      // operation execution
+      if (performOperation && instanceInfo.isSuccessful()) {
+        for (OperationInterface operationClass : operationAbstractClassList) {
+          Map<String, String> singleOperationConfig =
+              operationConfigSet.get(operationClass.getClass().getName());
+          boolean continueOnFailures =
+              singleOperationConfig.containsKey(continueOnFailuresName) && Boolean
+                  .parseBoolean(singleOperationConfig.get(continueOnFailuresName));
+          MaintenanceManagementInstanceInfo newResult = isTakeInstance ? operationClass
+              .operationExecForTakeSingleInstance(instanceName, singleOperationConfig, sp)
+              : operationClass
+                  .operationExecForFreeSingleInstance(instanceName, singleOperationConfig, sp);
+          instanceInfo.mergeResult(newResult, continueOnFailures);
+          if (!instanceInfo.isSuccessful()) {
+            LOG.warn("Operation failed for {}, skip all following operations.",
+                operationClass.getClass().getName());
+            break;
+          }
+        }
+      }
+      return instanceInfo;
+    } catch (Exception ex) {
+      return new MaintenanceManagementInstanceInfo(
+          MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+          Collections.singletonList(ex.getMessage()));
+    }
+  }
+
   private List<String> batchHelixInstanceStoppableCheck(String clusterId,
       Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
     Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
         .toMap(Function.identity(),
             instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
-    // finalStoppableChecks only contains instances that does not pass this health check
+    // finalStoppableChecks contains instances that does not pass this health check
     return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
   }
 
-  private void batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
+  private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<String> instances,
       Map<String, StoppableCheck> finalStoppableChecks, Map<String, String> customPayLoads) {
-    if (instances.isEmpty() ) {
+    if (instances.isEmpty()) {
       // if all instances failed at previous checks, then all following checks are not required.
-      return;
+      return instances;
     }
     RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
     if (restConfig == null) {
@@ -308,16 +438,65 @@ public class MaintenanceManagementService {
     List<String> instancesForCustomPartitionLevelChecks =
         filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
     if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
+      // add to finalStoppableChecks regardless of stoppable or not.
       Map<String, StoppableCheck> instancePartitionLevelChecks =
           performPartitionsCheck(instancesForCustomPartitionLevelChecks, restConfig,
               customPayLoads);
+      List<String> instancesForFollowingChecks = new ArrayList<>();
       for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry : instancePartitionLevelChecks
           .entrySet()) {
         String instance = instancePartitionStoppableCheckEntry.getKey();
         StoppableCheck stoppableCheck = instancePartitionStoppableCheckEntry.getValue();
         addStoppableCheck(finalStoppableChecks, instance, stoppableCheck);
+        if (stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck)) {
+          // instance passed this around of check or mandatory all checks
+          // will be checked in the next round
+          instancesForFollowingChecks.add(instance);
+        }
       }
+      return instancesForFollowingChecks;
     }
+    return instancesForCustomPartitionLevelChecks;
+  }
+
+  private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(String clusterId,
+      List<String> instances, List<String> healthChecks, Map<String, String> healthCheckConfig) {
+    List<String> instancesForNext = new ArrayList<>(instances);
+    Map<String, MaintenanceManagementInstanceInfo> instanceInfos = new HashMap<>();
+    Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
+    // TODO: Right now user can only choose from HelixInstanceStoppableCheck and
+    // CostumeInstanceStoppableCheck. We should add finer grain check groups to choose from
+    // i.e. HELIX:INSTANCE_NOT_ENABLED, CUSTOM_PARTITION_HEALTH_FAILURE:PARTITION_INITIAL_STATE_FAIL etc.
+    for (String healthCheck : healthChecks) {
+      if (healthCheck.equals("HelixInstanceStoppableCheck")) {
+        // this is helix own check
+        instancesForNext =
+            batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
+      } else if (healthCheck.equals("CustomInstanceStoppableCheck")) {
+        // custom check, includes custom Instance check and partition check.
+        instancesForNext = batchCustomInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
+            healthCheckConfig);
+      } else {
+        throw new UnsupportedOperationException(healthCheck + " is not supported yet!");
+      }
+    }
+    // assemble result. Returned map contains all instances with pass or fail status.
+    Set<String> clearedInstance = new HashSet<>(instancesForNext);
+    for (String instance : instances) {
+      MaintenanceManagementInstanceInfo result = new MaintenanceManagementInstanceInfo(
+          clearedInstance.contains(instance)
+              ? MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS
+              : MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE);
+      if (finalStoppableChecks.containsKey(instance) && !finalStoppableChecks.get(instance)
+          .isStoppable()) {
+        // If an non blocking check failed, the we will have a stoppbale check object with
+        // stoppbale = false and the instance is in clearedInstance. We will sign Success state and
+        // a error message.
+        result.addMessages(finalStoppableChecks.get(instance).getFailedChecks());
+      }
+      instanceInfos.put(instance, result);
+    }
+    return instanceInfos;
   }
 
   private void addStoppableCheck(Map<String, StoppableCheck> stoppableChecks, String instance,
@@ -343,7 +522,7 @@ public class MaintenanceManagementService {
           // put the check result of the failed-to-stop instances
           addStoppableCheck(finalStoppableCheckByInstance, instance, stoppableCheck);
         }
-        if (stoppableCheck.isStoppable() || _continueOnFailures){
+        if (stoppableCheck.isStoppable() || isNonBlockingCheck(stoppableCheck)) {
           // instance passed this around of check or mandatory all checks
           // will be checked in the next round
           instancesForNextCheck.add(instance);
@@ -356,10 +535,34 @@ public class MaintenanceManagementService {
     return instancesForNextCheck;
   }
 
+  private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
+    if (_nonBlockingHealthChecks.isEmpty()) {
+      return false;
+    }
+    if (_nonBlockingHealthChecks.contains(ALL_HEALTH_CHECK_NONBLOCK)) {
+      return true;
+    }
+    for (String failedCheck : stoppableCheck.getFailedChecks()) {
+      if (failedCheck.startsWith("CUSTOM_")) {
+        // failed custom check will have the pattern
+        // "CUSTOM_PARTITION_HEALTH_FAILURE:PARTITION_INITIAL_STATE_FAIL:partition_name"
+        // we want to keep the first 2 parts as failed test name.
+        String[] checks = failedCheck.split(":", 3);
+        failedCheck = checks[0] + checks[1];
+      }
+      // Helix own health check name wil be in this pattern "HELIX:INSTANCE_NOT_ALIVE",
+      // no need to preprocess.
+      if (!_nonBlockingHealthChecks.contains(failedCheck)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
     LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
-    Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
-        HealthCheck.STOPPABLE_CHECK_LIST);
+    Map<String, Boolean> helixStoppableCheck =
+        getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
 
     return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
   }
@@ -418,7 +621,18 @@ public class MaintenanceManagementService {
     }
     JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
     // parsing the inputs as string key value pairs
-    jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asText()));
+    jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
+        kv.getValue().isValueNode() ? kv.getValue().asText() : kv.getValue().toString()));
+    return result;
+  }
+
+  public static Map<String, String> getMapFromJsonPayload(JsonNode jsonNode)
+      throws IllegalArgumentException {
+    Map<String, String> result = new HashMap<>();
+    if (jsonNode != null) {
+      jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(),
+          kv.getValue().isValueNode() ? kv.getValue().asText() : kv.getValue().toString()));
+    }
     return result;
   }
 
@@ -428,11 +642,15 @@ public class MaintenanceManagementService {
         : OBJECT_MAPPER.convertValue(jsonContent, List.class);
   }
 
-  public static Map<String, String> getMapFromJsonPayload(JsonNode jsonContent)
-      throws IllegalArgumentException {
-    return jsonContent == null ? new HashMap<>()
-        : OBJECT_MAPPER.convertValue(jsonContent, new TypeReference<Map<String, String>>() {
-        });
+  public static List<String> getListFromJsonPayload(String jsonString)
+      throws IllegalArgumentException, JsonProcessingException {
+    return (jsonString == null) ? Collections.emptyList()
+        : OBJECT_MAPPER.readValue(jsonString, List.class);
+  }
+
+  public static boolean getBooleanFromJsonPayload(String jsonString)
+      throws IllegalArgumentException, JsonProcessingException {
+    return  OBJECT_MAPPER.readTree(jsonString).asBoolean();
   }
 
   @VisibleForTesting
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
index e47fd03..a401fa1 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
@@ -39,7 +39,7 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.client.CustomRestClient;
 import org.apache.helix.rest.client.CustomRestClientFactory;
@@ -241,7 +241,7 @@ public class HelixDataAccessorWrapper extends ZKHelixDataAccessor {
 
     for (String resourceName : resources) {
       getProperty(propertyKeyBuilder.idealStates(resourceName));
-      IdealState externalView = getProperty(propertyKeyBuilder.externalView(resourceName));
+      ExternalView externalView = getProperty(propertyKeyBuilder.externalView(resourceName));
       if (externalView != null) {
         String stateModeDef = externalView.getStateModelDefRef();
         getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index 92cd045..3fd284e 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -21,8 +21,10 @@ package org.apache.helix.rest.server.resources.helix;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -88,7 +90,8 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     operation_list,
     operation_config,
     continueOnFailures,
-    skipZKRead
+    skipZKRead,
+    performOperation
     }
 
   private static class MaintenanceOpInputFields {
@@ -96,8 +99,9 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     Map<String, String> healthCheckConfig = null;
     List<String> operations = null;
     Map<String, String> operationConfig = null;
-    boolean continueOnFailures = false;
+    Set<String> nonBlockingHelixCheck = new HashSet<>();
     boolean skipZKRead = false;
+    boolean performOperation = true;
   }
 
   @ResponseMetered(name = HttpConstants.READ_REQUEST)
@@ -227,14 +231,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
 
       MaintenanceManagementService maintenanceManagementService =
           new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
-              getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+              getConfigAccessor(), inputFields.skipZKRead, inputFields.nonBlockingHelixCheck,
               getNamespace());
 
       return JSONRepresentation(maintenanceManagementService
           .takeInstance(clusterId, instanceName, inputFields.healthChecks,
               inputFields.healthCheckConfig,
               inputFields.operations,
-              inputFields.operationConfig, true));
+              inputFields.operationConfig, inputFields.performOperation));
     } catch (Exception e) {
       LOG.error("Failed to takeInstances:", e);
       return badRequest("Failed to takeInstances: " + e.getMessage());
@@ -271,14 +275,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
 
       MaintenanceManagementService maintenanceManagementService =
           new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
-              getConfigAccessor(), inputFields.skipZKRead, inputFields.continueOnFailures,
+              getConfigAccessor(), inputFields.skipZKRead, inputFields.nonBlockingHelixCheck,
               getNamespace());
 
       return JSONRepresentation(maintenanceManagementService
           .freeInstance(clusterId, instanceName, inputFields.healthChecks,
               inputFields.healthCheckConfig,
               inputFields.operations,
-              inputFields.operationConfig, true));
+              inputFields.operationConfig, inputFields.performOperation));
     } catch (Exception e) {
       LOG.error("Failed to takeInstances:", e);
       return badRequest("Failed to takeInstances: " + e.getMessage());
@@ -296,28 +300,37 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     MaintenanceOpInputFields inputFields = new MaintenanceOpInputFields();
     String continueOnFailuresName = PerInstanceProperties.continueOnFailures.name();
     String skipZKReadName = PerInstanceProperties.skipZKRead.name();
-
-    inputFields.continueOnFailures =
-        inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
-            .containsKey(continueOnFailuresName) && Boolean
-            .parseBoolean(inputFields.healthCheckConfig.get(continueOnFailuresName));
-    inputFields.skipZKRead = inputFields.healthCheckConfig != null && inputFields.healthCheckConfig
-        .containsKey(skipZKReadName) && Boolean
-        .parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+    String performOperation = PerInstanceProperties.performOperation.name();
 
     inputFields.healthChecks = MaintenanceManagementService
         .getListFromJsonPayload(node.get(PerInstanceProperties.health_check_list.name()));
     inputFields.healthCheckConfig = MaintenanceManagementService
         .getMapFromJsonPayload(node.get(PerInstanceProperties.health_check_config.name()));
-    if (inputFields.healthCheckConfig != null || !inputFields.healthChecks.isEmpty()) {
-      inputFields.healthCheckConfig.remove(continueOnFailuresName);
-      inputFields.healthCheckConfig.remove(skipZKReadName);
+
+    if (inputFields.healthCheckConfig != null) {
+      if (inputFields.healthCheckConfig.containsKey(continueOnFailuresName)) {
+        inputFields.nonBlockingHelixCheck = new HashSet<String>(MaintenanceManagementService
+            .getListFromJsonPayload(inputFields.healthCheckConfig.get(continueOnFailuresName)));
+        // healthCheckConfig will be passed to customer's health check directly, we need to
+        // remove unrelated kc paris.
+        inputFields.healthCheckConfig.remove(continueOnFailuresName);
+      }
+      if (inputFields.healthCheckConfig.containsKey(skipZKReadName)) {
+        inputFields.skipZKRead =
+            Boolean.parseBoolean(inputFields.healthCheckConfig.get(skipZKReadName));
+        inputFields.healthCheckConfig.remove(skipZKReadName);
+      }
     }
 
     inputFields.operations = MaintenanceManagementService
         .getListFromJsonPayload(node.get(PerInstanceProperties.operation_list.name()));
     inputFields.operationConfig = MaintenanceManagementService
         .getMapFromJsonPayload(node.get(PerInstanceProperties.operation_config.name()));
+    if (inputFields.operationConfig != null && inputFields.operationConfig
+        .containsKey(performOperation)) {
+      inputFields.performOperation =
+          Boolean.parseBoolean(inputFields.operationConfig.get(performOperation));
+    }
 
     LOG.debug("Input fields for take/free Instance" + inputFields.toString());
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index ae11aec..1d4376b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -28,8 +28,6 @@ import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 
 public interface InstanceService {
-
-
     /**
      * Get the overall status of the instance
      *
@@ -38,7 +36,7 @@ public interface InstanceService {
      * @return An instance of {@link InstanceInfo} easily convertible to JSON
      */
     InstanceInfo getInstanceInfo(String clusterId, String instanceName,
-                                 List<HealthCheck> healthChecks);
+        List<HealthCheck> healthChecks);
 
     /**
      * Get the current instance stoppable checks
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index 770aeb5..b24ffa1 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -18,12 +18,15 @@ package org.apache.helix.rest.clusterMaintenanceService;
  * specific language governing permissions and limitations
  * under the License.
  */
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
@@ -46,6 +49,7 @@ import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
@@ -77,12 +81,22 @@ public class TestMaintenanceManagementService {
   }
 
   class MockMaintenanceManagementService extends MaintenanceManagementService {
+
     public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
         ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
-        boolean continueOnFailures, String namespace) {
-      super(dataAccessor, configAccessor, customRestClient, skipZKRead, continueOnFailures,
+        boolean continueOnFailure, String namespace) {
+      super(dataAccessor, configAccessor, customRestClient, skipZKRead,
+          continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
+              : Collections.emptySet(), namespace);
+    }
+
+    public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
+        ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
+        Set<String> nonBlockingHealthChecks, String namespace) {
+      super(dataAccessor, configAccessor, customRestClient, skipZKRead, nonBlockingHealthChecks,
           namespace);
     }
+
     @Override
     protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
         List<HealthCheck> healthChecks) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java
new file mode 100644
index 0000000..997b1aa
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestOperationImpl.java
@@ -0,0 +1,148 @@
+package org.apache.helix.rest.server;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementInstanceInfo;
+import org.apache.helix.rest.clusterMaintenanceService.api.OperationInterface;
+import org.apache.helix.rest.common.RestSnapShotSimpleImpl;
+import org.apache.helix.rest.common.datamodel.RestSnapShot;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.InstanceValidationUtil;
+
+
+public class TestOperationImpl implements OperationInterface {
+
+  @Override
+  public MaintenanceManagementInstanceInfo operationCheckForTakeSingleInstance(String instanceName, Map<String, String> operationConfig, RestSnapShot sn) {
+    Map<String, Boolean> isInstanceOnHoldCache = new HashMap<>();
+    for (Map.Entry<String, String> entry : operationConfig.entrySet()) {
+      isInstanceOnHoldCache.put(entry.getKey(), Boolean.parseBoolean(entry.getValue()));
+    }
+    try {
+      String unHealthyPartition =
+          siblingNodesActiveReplicaCheck(sn, instanceName, isInstanceOnHoldCache);
+      if (unHealthyPartition == null) {
+        return new MaintenanceManagementInstanceInfo(
+            MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS);
+      } else {
+        return new MaintenanceManagementInstanceInfo(
+            MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+            Collections.singletonList(unHealthyPartition));
+      }
+    } catch (Exception ex) {
+      return new MaintenanceManagementInstanceInfo(
+          MaintenanceManagementInstanceInfo.OperationalStatus.FAILURE,
+          Collections.singletonList(ex.getMessage()));
+    }
+  }
+
+  @Override
+  public MaintenanceManagementInstanceInfo operationCheckForFreeSingleInstance(String instanceName, Map<String, String> operationConfig, RestSnapShot sn) {
+    return new MaintenanceManagementInstanceInfo(
+        MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS);
+  }
+
+  @Override
+  public Map<String, MaintenanceManagementInstanceInfo> operationCheckForTakeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+    return null;
+  }
+
+  @Override
+  public Map<String, MaintenanceManagementInstanceInfo> operationCheckForFreeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+    return null;
+  }
+
+  @Override
+  public MaintenanceManagementInstanceInfo operationExecForTakeSingleInstance(String instanceName,
+      Map<String, String> operationConfig, RestSnapShot sn) {
+    return new MaintenanceManagementInstanceInfo(
+        MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS,
+        "DummyTakeOperationResult");
+  }
+
+  @Override
+  public MaintenanceManagementInstanceInfo operationExecForFreeSingleInstance(String instanceName,
+      Map<String, String> operationConfig, RestSnapShot sn) {
+    return new MaintenanceManagementInstanceInfo(
+        MaintenanceManagementInstanceInfo.OperationalStatus.SUCCESS,
+        "DummyFreeOperationResult");
+  }
+
+  @Override
+  public Map<String, MaintenanceManagementInstanceInfo> operationExecForTakeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+    return null;
+  }
+
+  @Override
+  public Map<String, MaintenanceManagementInstanceInfo> operationExecForFreeInstances(Collection<String> instances, Map<String, String> operationConfig, RestSnapShot sn) {
+    return null;
+  }
+
+  public String siblingNodesActiveReplicaCheck(RestSnapShot snapShot, String instanceName,
+      Map<String, Boolean> isInstanceOnHoldCache) throws HelixException {
+    String clusterName = snapShot.getClusterName();
+    if (!(snapShot instanceof RestSnapShotSimpleImpl)) {
+      throw new HelixException("Passed in Snapshot is not an instance of RestSnapShotSimpleImpl");
+    } RestSnapShotSimpleImpl restSnapShotSimple = (RestSnapShotSimpleImpl) snapShot;
+
+    PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(clusterName);
+    List<String> resources = restSnapShotSimple.getChildNames(propertyKeyBuilder.idealStates());
+
+    for (String resourceName : resources) {
+      IdealState idealState =
+          restSnapShotSimple.getProperty(propertyKeyBuilder.idealStates(resourceName));
+      if (idealState == null || !idealState.isEnabled() || !idealState.isValid()
+          || TaskConstants.STATE_MODEL_NAME.equals(idealState.getStateModelDefRef())) {
+        continue;
+      }
+      ExternalView externalView =
+          restSnapShotSimple.getProperty(propertyKeyBuilder.externalView(resourceName));
+      if (externalView == null) {
+        throw new HelixException(
+            String.format("Resource %s does not have external view!", resourceName));
+      }
+      // Get the minActiveReplicas constraint for the resource
+      int minActiveReplicas = externalView.getMinActiveReplicas();
+      if (minActiveReplicas == -1) {
+        continue;
+      }
+      String stateModeDef = externalView.getStateModelDefRef();
+      StateModelDefinition stateModelDefinition =
+          restSnapShotSimple.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
+      Set<String> unhealthyStates = new HashSet<>(InstanceValidationUtil.UNHEALTHY_STATES);
+      if (stateModelDefinition != null) {
+        unhealthyStates.add(stateModelDefinition.getInitialState());
+      }
+      for (String partition : externalView.getPartitionSet()) {
+        Map<String, String> stateByInstanceMap = externalView.getStateMap(partition);
+        // found the resource hosted on the instance
+        if (stateByInstanceMap.containsKey(instanceName)) {
+          int numHealthySiblings = 0;
+          for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
+            if (!entry.getKey().equals(instanceName) && !unhealthyStates.contains(entry.getValue())
+                && !isInstanceOnHoldCache.get(entry.getKey())) {
+              numHealthySiblings++;
+            }
+          }
+          if (numHealthySiblings < minActiveReplicas) {
+            return partition;
+          }
+        }
+      }
+    }
+
+    return null;
+  }
+}
+
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 58bccbf..6ca7bf6 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -64,7 +63,6 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
     Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/stoppable")
         .format(STOPPABLE_CLUSTER, "instance1").post(this, entity);
     String stoppableCheckResult = response.readEntity(String.class);
-
     Map<String, Object> actualMap = OBJECT_MAPPER.readValue(stoppableCheckResult, Map.class);
     List<String> failedChecks = Arrays
         .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
@@ -78,7 +76,6 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
   @Test(dependsOnMethods = "testIsInstanceStoppable")
   public void testTakeInstanceNegInput() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
-
     post("clusters/TestCluster_0/instances/instance1/takeInstance", null,
         Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
         Response.Status.BAD_REQUEST.getStatusCode(), true);
@@ -86,6 +83,153 @@ public class TestPerInstanceAccessor extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testTakeInstanceNegInput")
+  public void testTakeInstanceNegInput2() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    List<String> errorMsg = Arrays.asList("Invalid input. Please provide at least one health check or operation.");
+    Map<String, Object> expectedMap =
+        ImmutableMap.of("successful", false, "messages", errorMsg, "operationResult", "");
+    Assert.assertEquals(actualMap, expectedMap);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceNegInput2")
+  public void testTakeInstanceHealthCheck() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload = "{ \"health_check_list\" : [\"HelixInstanceStoppableCheck\", \"CustomInstanceStoppableCheck\"],"
+        + "\"health_check_config\" : { \"client\" : \"espresso\" }} ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    List<String> errorMsg = Arrays
+        .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+            "HELIX:INSTANCE_NOT_STABLE");
+    Map<String, Object> expectedMap =
+        ImmutableMap.of("successful", false, "messages", errorMsg, "operationResult", "");
+    Assert.assertEquals(actualMap, expectedMap);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceNegInput2")
+  public void testTakeInstanceNonBlockingCheck() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload = "{ \"health_check_list\" : [\"HelixInstanceStoppableCheck\"],"
+        + "\"health_check_config\" : { \"client\" : \"espresso\" , "
+        + "\"continueOnFailures\" : [\"HELIX:EMPTY_RESOURCE_ASSIGNMENT\", \"HELIX:INSTANCE_NOT_ENABLED\","
+        + " \"HELIX:INSTANCE_NOT_STABLE\"]} } ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1").post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    List<String> errorMsg = Arrays
+        .asList("HELIX:EMPTY_RESOURCE_ASSIGNMENT", "HELIX:INSTANCE_NOT_ENABLED",
+            "HELIX:INSTANCE_NOT_STABLE");
+    Map<String, Object> expectedMap =
+        ImmutableMap.of("successful", true, "messages", errorMsg, "operationResult", "");
+    Assert.assertEquals(actualMap, expectedMap);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceHealthCheck")
+  public void testTakeInstanceOperationSuccess() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload =
+        "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"]} ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1")
+        .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    Map<String, Object> expectedMap = ImmutableMap
+        .of("successful", true, "messages", new ArrayList<>(), "operationResult", "DummyTakeOperationResult");
+    Assert.assertEquals(actualMap, expectedMap);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceOperationSuccess")
+  public void testFreeInstanceOperationSuccess() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload =
+        "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"]} ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/freeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1")
+        .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    Map<String, Object> expectedMap = ImmutableMap
+        .of("successful", true, "messages", new ArrayList<>(), "operationResult",
+            "DummyFreeOperationResult");
+    Assert.assertEquals(actualMap, expectedMap);
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testFreeInstanceOperationSuccess")
+  public void testTakeInstanceOperationCheckFailure() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+        + "\"operation_config\": { \"org.apache.helix.rest.server.TestOperationImpl\" :"
+        + " {\"instance0\": true, \"instance2\": true, "
+        + "\"instance3\": true, \"instance4\": true, \"instance5\": true, "
+        + " \"value\" : \"i001\", \"list_value\" : [\"list1\"]}} } ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance0")
+        .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    Assert.assertFalse((boolean)actualMap.get("successful"));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceOperationCheckFailure")
+  public void testTakeInstanceOperationCheckFailureNonBlocking() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+        + "\"operation_config\": { \"org.apache.helix.rest.server.TestOperationImpl\" : "
+        + "{\"instance0\": true, \"instance2\": true, "
+        + "\"instance3\": true, \"instance4\": true, \"instance5\": true, "
+        + "\"continueOnFailures\" : true} } } ";
+
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance0")
+        .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+    System.out.println("testTakeInstanceOperationCheckFailureNonBlocking" + takeInstanceResult);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    Assert.assertTrue((boolean)actualMap.get("successful"));
+    Assert.assertEquals(actualMap.get("operationResult"), "DummyTakeOperationResult");
+    // The non blocking test should generate msg but won't return failure status
+    Assert.assertFalse(actualMap.get("messages").equals("[]"));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceOperationCheckFailureNonBlocking")
+  public void testTakeInstanceCheckOnly() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String payload = "{ \"operation_list\" : [\"org.apache.helix.rest.server.TestOperationImpl\"],"
+        + "\"operation_config\": {\"performOperation\": false} } ";
+    Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}/takeInstance")
+        .format(STOPPABLE_CLUSTER, "instance1")
+        .post(this, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE));
+    String takeInstanceResult = response.readEntity(String.class);
+
+    Map<String, Object> actualMap = OBJECT_MAPPER.readValue(takeInstanceResult, Map.class);
+    Assert.assertTrue((boolean)actualMap.get("successful"));
+    Assert.assertTrue(actualMap.get("operationResult").equals(""));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testTakeInstanceCheckOnly")
   public void testGetAllMessages() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     String testInstance = CLUSTER_NAME + "localhost_12926"; //Non-live instance