You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/12/08 02:11:01 UTC

(helix) 02/09: Implement the cross-zone-based stoppable check (#2680)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0b01378927c9ec0cb763ab59dd1cd9287d5a80f5
Author: Xiaxuan Gao <32...@users.noreply.github.com>
AuthorDate: Tue Oct 31 15:41:05 2023 -0700

    Implement the cross-zone-based stoppable check (#2680)
    
    Implement the cross-zone-based stoppable check and add to_be_stopped_instances query parameter to the stoppable check API
---
 .../apache/helix/util/InstanceValidationUtil.java  |  58 +++++++-
 .../helix/util/TestInstanceValidationUtil.java     |  77 ++++++++++
 .../MaintenanceManagementService.java              |  40 ++++--
 .../StoppableInstancesSelector.java                | 138 ++++++++++++------
 .../server/resources/helix/InstancesAccessor.java  |  35 +++--
 .../TestMaintenanceManagementService.java          |  14 +-
 .../helix/rest/server/AbstractTestClass.java       |  77 +++++++++-
 .../helix/rest/server/TestInstancesAccessor.java   | 155 +++++++++++++++++++++
 .../util/TestInstanceValidationUtilInRest.java     |  64 +++++++++
 9 files changed, 576 insertions(+), 82 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index fdbf7dd1a..5f179e784 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.util;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -254,6 +255,28 @@ public class InstanceValidationUtil {
   public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
       Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
       HelixDataAccessor dataAccessor) {
+    return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop,
+        dataAccessor, Collections.emptySet());
+  }
+
+  /**
+   * Get the problematic partitions on the to-be-stop instance
+   * Requirement:
+   *  If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE,
+   *  the cluster still have enough "healthy" replicas on other sibling instances
+   *
+   *  - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
+   *
+   * @param globalPartitionHealthStatus (instance => (partition name, health status))
+   * @param instanceToBeStop The instance to be stopped
+   * @param dataAccessor The data accessor
+   * @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
+   *                            shouldn't contain the `instanceToBeStop`
+   * @return A list of problematic partitions if the instance is stopped
+   */
+  public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
+      Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
+      HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
     Map<String, List<String>> unhealthyPartitions = new HashMap<>();
 
     for (ExternalView externalView : externalViews) {
@@ -273,7 +296,8 @@ public class InstanceValidationUtil {
             && stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
           for (String siblingInstance : stateMap.keySet()) {
             // Skip this self check
-            if (siblingInstance.equals(instanceToBeStop)) {
+            if (siblingInstance.equals(instanceToBeStop) || (toBeStoppedInstances != null
+                && toBeStoppedInstances.contains(siblingInstance))) {
               continue;
             }
 
@@ -366,11 +390,32 @@ public class InstanceValidationUtil {
    *
    * TODO: Use in memory cache and query instance's currentStates
    *
-   * @param dataAccessor
-   * @param instanceName
+   * @param dataAccessor A helper class to access the Helix data.
+   * @param instanceName An instance to be evaluated against this check.
+   * @return
+   */
+  public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
+      String instanceName) {
+    return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
+  }
+
+  /**
+   * Check if sibling nodes of the instance meet min active replicas constraint
+   * Two instances are sibling of each other if they host the same partition. And sibling nodes
+   * that are in toBeStoppableInstances will be presumed to be stopped.
+   * WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
+   * due to external view propagation latency
+   *
+   * TODO: Use in memory cache and query instance's currentStates
+   *
+   * @param dataAccessor A helper class to access the Helix data.
+   * @param instanceName An instance to be evaluated against this check.
+   * @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
+   *                             shouldn't contain the `instanceName`
    * @return
    */
-  public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) {
+  public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
+      String instanceName, Set<String> toBeStoppedInstances) {
     PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
     List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());
 
@@ -406,8 +451,9 @@ public class InstanceValidationUtil {
         if (stateByInstanceMap.containsKey(instanceName)) {
           int numHealthySiblings = 0;
           for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
-            if (!entry.getKey().equals(instanceName)
-                && !unhealthyStates.contains(entry.getValue())) {
+            if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null
+                || !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains(
+                entry.getValue())) {
               numHealthySiblings++;
             }
           }
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 2c51fc92b..aa1ba3229 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -21,7 +21,9 @@ package org.apache.helix.util;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -401,6 +403,81 @@ public class TestInstanceValidationUtil {
     Assert.assertTrue(result);
   }
 
+  @Test
+  public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() {
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
+        .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    // set ideal state
+    IdealState idealState = mock(IdealState.class);
+    when(idealState.isEnabled()).thenReturn(true);
+    when(idealState.isValid()).thenReturn(true);
+    when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
+    doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
+
+    // set external view
+    ExternalView externalView = mock(ExternalView.class);
+    when(externalView.getMinActiveReplicas()).thenReturn(2);
+    when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+    when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
+        "instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
+    doReturn(externalView).when(mock.dataAccessor)
+        .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
+    when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+    doReturn(stateModelDefinition).when(mock.dataAccessor)
+        .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+    Set<String> toBeStoppedInstances = new HashSet<>();
+    toBeStoppedInstances.add("instance3");
+    toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
+    boolean result =
+        InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
+    Assert.assertTrue(result);
+
+    result =
+        InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, null);
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
+        .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    // set ideal state
+    IdealState idealState = mock(IdealState.class);
+    when(idealState.isEnabled()).thenReturn(true);
+    when(idealState.isValid()).thenReturn(true);
+    when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
+    doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
+
+    // set external view
+    ExternalView externalView = mock(ExternalView.class);
+    when(externalView.getMinActiveReplicas()).thenReturn(2);
+    when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+    when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
+        "instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
+    doReturn(externalView).when(mock.dataAccessor)
+        .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
+    when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+    doReturn(stateModelDefinition).when(mock.dataAccessor)
+        .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+    Set<String> toBeStoppedInstances = new HashSet<>();
+    toBeStoppedInstances.add("instance1");
+    toBeStoppedInstances.add("instance2");
+    boolean result =
+        InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
+
+    Assert.assertFalse(result);
+  }
+
   @Test
   public void TestSiblingNodesActiveReplicaCheck_fail() {
     String resource = "resource";
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
index 529fc469d..c3fa04966 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java
@@ -339,17 +339,23 @@ public class MaintenanceManagementService {
    */
   public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
       String jsonContent) throws IOException {
-    return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
-        .get(instanceName);
+    return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName),
+        jsonContent).get(instanceName);
   }
 
-
   public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
       List<String> instances, String jsonContent) throws IOException {
+    return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
+        Collections.emptySet());
+  }
+
+  public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
+      List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
     Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
     // helix instance check.
     List<String> instancesForCustomInstanceLevelChecks =
-        batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
+        batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
+            toBeStoppedInstances);
     // custom check, includes partition check.
     batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
         finalStoppableChecks, getMapFromJsonPayload(jsonContent));
@@ -441,10 +447,11 @@ public class MaintenanceManagementService {
   }
 
   private List<String> batchHelixInstanceStoppableCheck(String clusterId,
-      Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
-    Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
-        .toMap(Function.identity(),
-            instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
+      Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
+      Set<String> toBeStoppedInstances) {
+    Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
+        Collectors.toMap(Function.identity(), instance -> POOL.submit(
+            () -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
     // finalStoppableChecks contains instances that does not pass this health check
     return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
   }
@@ -512,7 +519,8 @@ public class MaintenanceManagementService {
       if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
         // this is helix own check
         instancesForNext =
-            batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
+            batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
+                Collections.emptySet());
       } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
         // custom check, includes custom Instance check and partition check.
         instancesForNext =
@@ -601,10 +609,12 @@ public class MaintenanceManagementService {
     return true;
   }
 
-  private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
+  private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
+      Set<String> toBeStoppedInstances) {
     LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
     Map<String, Boolean> helixStoppableCheck =
-        getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
+        getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
+            toBeStoppedInstances);
 
     return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
   }
@@ -698,6 +708,12 @@ public class MaintenanceManagementService {
   @VisibleForTesting
   protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
       List<HealthCheck> healthChecks) {
+    return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
+  }
+
+  @VisibleForTesting
+  protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+      List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
     Map<String, Boolean> healthStatus = new HashMap<>();
     for (HealthCheck healthCheck : healthChecks) {
       switch (healthCheck) {
@@ -745,7 +761,7 @@ public class MaintenanceManagementService {
           break;
         case MIN_ACTIVE_REPLICA_CHECK_FAILED:
           healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
-              InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
+              InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
           break;
         default:
           LOG.error("Unsupported health check: {}", healthCheck);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index dafe1ab2d..8cf8bc83c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -22,6 +22,7 @@ package org.apache.helix.rest.clusterMaintenanceService;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -33,31 +34,27 @@ import java.util.stream.Collectors;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.rest.server.json.cluster.ClusterTopology;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
 
 public class StoppableInstancesSelector {
   // This type does not belong to real HealthCheck failed reason. Also, if we add this type
   // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
   // loops all the types to do corresponding checks.
   private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST";
-  private String _clusterId;
+  private final String _clusterId;
   private List<String> _orderOfZone;
-  private String _customizedInput;
-  private ArrayNode _stoppableInstances;
-  private ObjectNode _failedStoppableInstances;
-  private MaintenanceManagementService _maintenanceService;
-  private ClusterTopology _clusterTopology;
+  private final String _customizedInput;
+  private final MaintenanceManagementService _maintenanceService;
+  private final ClusterTopology _clusterTopology;
 
   public StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
-      String customizedInput, ArrayNode stoppableInstances, ObjectNode failedStoppableInstances,
-      MaintenanceManagementService maintenanceService, ClusterTopology clusterTopology) {
+      String customizedInput, MaintenanceManagementService maintenanceService,
+      ClusterTopology clusterTopology) {
     _clusterId = clusterId;
     _orderOfZone = orderOfZone;
     _customizedInput = customizedInput;
-    _stoppableInstances = stoppableInstances;
-    _failedStoppableInstances = failedStoppableInstances;
     _maintenanceService = maintenanceService;
     _clusterTopology = clusterTopology;
   }
@@ -69,26 +66,92 @@ public class StoppableInstancesSelector {
    * reasons for non-stoppability.
    *
    * @param instances A list of instance to be evaluated.
+   * @param toBeStoppedInstances A list of instances presumed to be are already stopped
+   * @return An ObjectNode containing:
+   *         - 'stoppableNode': List of instances that can be stopped.
+   *         - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
+   *         a list of reasons for non-stoppability as the value.
    * @throws IOException
    */
-  public void getStoppableInstancesInSingleZone(List<String> instances) throws IOException {
+  public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
+      List<String> toBeStoppedInstances) throws IOException {
+    ObjectNode result = JsonNodeFactory.instance.objectNode();
+    ArrayNode stoppableInstances =
+        result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    ObjectNode failedStoppableInstances = result.putObject(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
+
     List<String> zoneBasedInstance =
         getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
+    populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
+        failedStoppableInstances);
+    processNonexistentInstances(instances, failedStoppableInstances);
+
+    return result;
+  }
+
+  /**
+   * Evaluates and collects stoppable instances cross a set of zones based on the order of zones.
+   * The method iterates through instances, performing stoppable checks, and records reasons for
+   * non-stoppability.
+   *
+   * @param instances A list of instance to be evaluated.
+   * @param toBeStoppedInstances A list of instances presumed to be are already stopped
+   * @return An ObjectNode containing:
+   *         - 'stoppableNode': List of instances that can be stopped.
+   *         - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
+   *         a list of reasons for non-stoppability as the value.
+   * @throws IOException
+   */
+  public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
+      List<String> toBeStoppedInstances) throws IOException {
+    ObjectNode result = JsonNodeFactory.instance.objectNode();
+    ArrayNode stoppableInstances =
+        result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    ObjectNode failedStoppableInstances = result.putObject(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
+
+    Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
+    for (String zone : _orderOfZone) {
+      Set<String> instanceSet = new HashSet<>(instances);
+      Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
+      instanceSet.retainAll(currentZoneInstanceSet);
+      if (instanceSet.isEmpty()) {
+        continue;
+      }
+      populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
+          failedStoppableInstances);
+    }
+    processNonexistentInstances(instances, failedStoppableInstances);
+    return result;
+  }
+
+  private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
+      ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
     Map<String, StoppableCheck> instancesStoppableChecks =
-        _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, zoneBasedInstance,
-            _customizedInput);
+        _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
+            _customizedInput, toBeStoppedInstances);
+
     for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
       String instance = instanceStoppableCheck.getKey();
       StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
       if (!stoppableCheck.isStoppable()) {
-        ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(instance);
+        ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
         for (String failedReason : stoppableCheck.getFailedChecks()) {
           failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
         }
       } else {
-        _stoppableInstances.add(instance);
+        stoppableInstances.add(instance);
+        // Update the toBeStoppedInstances set with the currently identified stoppable instance.
+        // This ensures that subsequent checks in other zones are aware of this instance's stoppable status.
+        toBeStoppedInstances.add(instance);
       }
     }
+  }
+
+  private void processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
     // Adding following logic to check whether instances exist or not. An instance exist could be
     // checking following scenario:
     // 1. Instance got dropped. (InstanceConfig is gone.)
@@ -100,28 +163,36 @@ public class StoppableInstancesSelector {
     Set<String> nonSelectedInstances = new HashSet<>(instances);
     nonSelectedInstances.removeAll(_clusterTopology.getAllInstances());
     for (String nonSelectedInstance : nonSelectedInstances) {
-      ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(nonSelectedInstance);
+      ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
       failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
     }
   }
 
-  public void getStoppableInstancesCrossZones() {
-    // TODO: Add implementation to enable cross zone stoppable check.
-    throw new NotImplementedException("Not Implemented");
-  }
-
   /**
    * Determines the order of zones. If an order is provided by the user, it will be used directly.
    * Otherwise, zones will be ordered by their associated instance count in descending order.
    *
    * If `random` is true, the order of zones will be randomized regardless of any previous order.
    *
+   * @param instances A list of instance to be used to calculate the order of zones.
    * @param random Indicates whether to randomize the order of zones.
    */
-  public void calculateOrderOfZone(boolean random) {
+  public void calculateOrderOfZone(List<String> instances, boolean random) {
     if (_orderOfZone == null) {
-      _orderOfZone =
-          new ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet());
+      Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
+      Map<String, Set<String>> zoneToInstancesMap = new HashMap<>();
+      for (ClusterTopology.Zone zone : _clusterTopology.getZones()) {
+        Set<String> instanceSet = new HashSet<>(instances);
+        // TODO: Use instance config from Helix-rest Cache to get the zone instead of reading the topology info
+        Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone.getId()));
+        instanceSet.retainAll(currentZoneInstanceSet);
+        if (instanceSet.isEmpty()) {
+          continue;
+        }
+        zoneToInstancesMap.put(zone.getId(), instanceSet);
+      }
+
+      _orderOfZone = new ArrayList<>(getOrderedZoneToInstancesMap(zoneToInstancesMap).keySet());
     }
 
     if (_orderOfZone.isEmpty()) {
@@ -182,8 +253,6 @@ public class StoppableInstancesSelector {
     private String _clusterId;
     private List<String> _orderOfZone;
     private String _customizedInput;
-    private ArrayNode _stoppableInstances;
-    private ObjectNode _failedStoppableInstances;
     private MaintenanceManagementService _maintenanceService;
     private ClusterTopology _clusterTopology;
 
@@ -202,16 +271,6 @@ public class StoppableInstancesSelector {
       return this;
     }
 
-    public StoppableInstancesSelectorBuilder setStoppableInstances(ArrayNode stoppableInstances) {
-      _stoppableInstances = stoppableInstances;
-      return this;
-    }
-
-    public StoppableInstancesSelectorBuilder setFailedStoppableInstances(ObjectNode failedStoppableInstances) {
-      _failedStoppableInstances = failedStoppableInstances;
-      return this;
-    }
-
     public StoppableInstancesSelectorBuilder setMaintenanceService(
         MaintenanceManagementService maintenanceService) {
       _maintenanceService = maintenanceService;
@@ -224,9 +283,8 @@ public class StoppableInstancesSelector {
     }
 
     public StoppableInstancesSelector build() {
-      return new StoppableInstancesSelector(_clusterId, _orderOfZone,
-          _customizedInput, _stoppableInstances, _failedStoppableInstances, _maintenanceService,
-          _clusterTopology);
+      return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
+          _maintenanceService, _clusterTopology);
     }
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 8a2120270..785195ebe 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -20,7 +20,9 @@ package org.apache.helix.rest.server.resources.helix;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -67,6 +69,7 @@ public class InstancesAccessor extends AbstractHelixResource {
     disabled,
     selection_base,
     zone_order,
+    to_be_stopped_instances,
     customized_values,
     instance_stoppable_parallel,
     instance_not_stoppable_with_reasons
@@ -224,6 +227,7 @@ public class InstancesAccessor extends AbstractHelixResource {
 
       List<String> orderOfZone = null;
       String customizedInput = null;
+      List<String> toBeStoppedInstances = Collections.emptyList();
       if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
         customizedInput =
             node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
@@ -235,18 +239,26 @@ public class InstancesAccessor extends AbstractHelixResource {
             OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
         if (!orderOfZone.isEmpty() && random) {
           String message =
-              "Both 'orderOfZone' and 'random' parameters are set. Please specify only one option.";
+              "Both 'zone_order' and 'random' parameters are set. Please specify only one option.";
           _logger.error(message);
           return badRequest(message);
         }
       }
 
-      // Prepare output result
-      ObjectNode result = JsonNodeFactory.instance.objectNode();
-      ArrayNode stoppableInstances =
-          result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
-      ObjectNode failedStoppableInstances = result.putObject(
-          InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+      if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
+        toBeStoppedInstances = OBJECT_MAPPER.readValue(
+            node.get(InstancesProperties.to_be_stopped_instances.name()).toString(),
+            OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
+        Set<String> instanceSet = new HashSet<>(instances);
+        instanceSet.retainAll(toBeStoppedInstances);
+        if (!instanceSet.isEmpty()) {
+          String message =
+              "'to_be_stopped_instances' and 'instances' have intersection: " + instanceSet
+                  + ". Please make them mutually exclusive.";
+          _logger.error(message);
+          return badRequest(message);
+        }
+      }
 
       MaintenanceManagementService maintenanceService =
           new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
@@ -260,18 +272,17 @@ public class InstancesAccessor extends AbstractHelixResource {
               .setClusterId(clusterId)
               .setOrderOfZone(orderOfZone)
               .setCustomizedInput(customizedInput)
-              .setStoppableInstances(stoppableInstances)
-              .setFailedStoppableInstances(failedStoppableInstances)
               .setMaintenanceService(maintenanceService)
               .setClusterTopology(clusterTopology)
               .build();
-      stoppableInstancesSelector.calculateOrderOfZone(random);
+      stoppableInstancesSelector.calculateOrderOfZone(instances, random);
+      ObjectNode result;
       switch (selectionBase) {
         case zone_based:
-          stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances);
+          result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
           break;
         case cross_zone_based:
-          stoppableInstancesSelector.getStoppableInstancesCrossZones();
+          result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
           break;
         case instance_based:
         default:
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
index f8408b070..a49a95066 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java
@@ -114,7 +114,7 @@ public class TestMaintenanceManagementService {
 
     @Override
     protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
-        List<HealthCheck> healthChecks) {
+        List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
       return Collections.emptyMap();
     }
   }
@@ -127,7 +127,7 @@ public class TestMaintenanceManagementService {
             _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
           @Override
           protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
-              String instanceName, List<HealthCheck> healthChecks) {
+              String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
             return failedCheck;
           }
         };
@@ -147,7 +147,7 @@ public class TestMaintenanceManagementService {
             _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
           @Override
           protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
-              String instanceName, List<HealthCheck> healthChecks) {
+              String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
             return Collections.emptyMap();
           }
         };
@@ -227,7 +227,7 @@ public class TestMaintenanceManagementService {
             _customRestClient, false, false,
             new HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)),
             HelixRestNamespace.DEFAULT_NAMESPACE_NAME);
-    
+
     StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, "");
     List<String> expectedFailedChecks = Arrays.asList(
         StoppableCheck.Category.CUSTOM_PARTITION_CHECK.getPrefix()
@@ -246,7 +246,7 @@ public class TestMaintenanceManagementService {
             _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
           @Override
           protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
-              String instanceName, List<HealthCheck> healthChecks) {
+              String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
             return Collections.emptyMap();
           }
         };
@@ -365,7 +365,7 @@ public class TestMaintenanceManagementService {
             HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
           @Override
           protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
-              String instanceName, List<HealthCheck> healthChecks) {
+              String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
             return instanceHealthFailedCheck;
           }
         };
@@ -393,7 +393,7 @@ public class TestMaintenanceManagementService {
             _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
           @Override
           protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
-              String instanceName, List<HealthCheck> healthChecks) {
+              String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
             return Collections.emptyMap();
           }
         };
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index a7cf91c7b..68561ce83 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -54,6 +54,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.RESTConfig;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.common.HelixRestNamespace;
@@ -129,9 +130,14 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   protected static HelixZkClient _gZkClientTestNS;
   protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
   protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
+  protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2";
   protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
   protected static final List<String> STOPPABLE_INSTANCES =
       Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
+  protected static final List<String> STOPPABLE_INSTANCES2 =
+      Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5",
+          "instance6", "instance7", "instance8", "instance9", "instance10", "instance11",
+          "instance12", "instance13", "instance14");
 
   protected static Set<String> _clusters;
   protected static String _superCluster = "superCluster";
@@ -329,13 +335,14 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       _configAccessor.setClusterConfig(cluster, clusterConfig);
       createResourceConfigs(cluster, 8);
       _workflowMap.put(cluster, createWorkflows(cluster, 3));
-      Set<String> resources = createResources(cluster, 8);
+      Set<String> resources = createResources(cluster, 8, MIN_ACTIVE_REPLICA, NUM_REPLICA);
       _instancesMap.put(cluster, instances);
       _liveInstancesMap.put(cluster, liveInstances);
       _resourcesMap.put(cluster, resources);
       _clusterControllerManagers.add(startController(cluster));
     }
     preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
+    preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2);
   }
 
   protected Set<String> createInstances(String cluster, int numInstances) {
@@ -348,16 +355,17 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     return instances;
   }
 
-  protected Set<String> createResources(String cluster, int numResources) {
+  protected Set<String> createResources(String cluster, int numResources, int minActiveReplica,
+      int replicationFactor) {
     Set<String> resources = new HashSet<>();
     for (int i = 0; i < numResources; i++) {
       String resource = cluster + "_db_" + i;
       _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave");
       IdealState idealState =
           _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
-      idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
+      idealState.setMinActiveReplicas(minActiveReplica);
       _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
-      _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
+      _gSetupTool.rebalanceStorageCluster(cluster, resource, replicationFactor);
       resources.add(resource);
     }
     return resources;
@@ -575,7 +583,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
 
     // Start participant
     startInstances(clusterName, new TreeSet<>(instances), 3);
-    createResources(clusterName, 1);
+    createResources(clusterName, 1, MIN_ACTIVE_REPLICA, NUM_REPLICA);
     _clusterControllerManagers.add(startController(clusterName));
 
     // Make sure that cluster config exists
@@ -606,6 +614,65 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3));
   }
 
+  private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterName,
+      List<String> instances) throws Exception {
+    _gSetupTool.addCluster(clusterName, true);
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setFaultZoneType("helixZoneId");
+    clusterConfig.setPersistIntermediateAssignment(true);
+    _configAccessor.setClusterConfig(clusterName, clusterConfig);
+    RESTConfig emptyRestConfig = new RESTConfig(clusterName);
+    _configAccessor.setRESTConfig(clusterName, emptyRestConfig);
+    // Create instance configs
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    int perZoneInstancesCount = 3;
+    int curZoneCount = 0, zoneId = 1;
+    for (int i = 0; i < instances.size(); i++) {
+      InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
+      instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i);
+      if (++curZoneCount >= perZoneInstancesCount) {
+        curZoneCount = 0;
+        zoneId++;
+      }
+      instanceConfigs.add(instanceConfig);
+    }
+
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
+    }
+
+    // Start participant
+    startInstances(clusterName, new TreeSet<>(instances), instances.size());
+    createResources(clusterName, 1, 2, 3);
+    _clusterControllerManagers.add(startController(clusterName));
+
+    // Make sure that cluster config exists
+    boolean isClusterConfigExist = TestHelper.verify(() -> {
+      ClusterConfig stoppableClusterConfig;
+      try {
+        stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName);
+      } catch (Exception e) {
+        return false;
+      }
+      return (stoppableClusterConfig != null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isClusterConfigExist);
+    // Make sure that instance config exists for the instance0 to instance5
+    for (String instance: instances) {
+      boolean isinstanceConfigExist = TestHelper.verify(() -> {
+        InstanceConfig instanceConfig;
+        try {
+          instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance);
+        } catch (Exception e) {
+          return false;
+        }
+        return (instanceConfig != null);
+      }, TestHelper.WAIT_DURATION);
+      Assert.assertTrue(isinstanceConfigExist);
+    }
+    _clusters.add(clusterName);
+    _workflowMap.put(clusterName, createWorkflows(clusterName, 3));
+  }
   /**
    * Starts a HelixRestServer for the test suite.
    * @return
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 01701a486..2bc539a4d 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -40,12 +40,167 @@ import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
 import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestInstancesAccessor extends AbstractTestClass {
   private final static String CLUSTER_NAME = "TestCluster_0";
 
+  @DataProvider
+  public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() {
+    return new Object[][]{
+        {String.format(
+            "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\","
+                + " \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\"],"
+                + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\"]}",
+            InstancesAccessor.InstancesProperties.selection_base.name(),
+            InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+            InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2",
+            "instance3", "instance4", "instance5", "instance6", "instance7", "instance8",
+            "instance9", "instance10", "instance11", "instance12", "instance13", "instance14",
+            "invalidInstance",
+            InstancesAccessor.InstancesProperties.zone_order.name(),"zone5", "zone4", "zone3", "zone2",
+            "zone1",
+            InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+            "instance0"),
+        },
+        {String.format(
+            "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"],"
+                + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+            InstancesAccessor.InstancesProperties.selection_base.name(),
+            InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+            InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
+            "instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
+            "instance14", "invalidInstance",
+            InstancesAccessor.InstancesProperties.zone_order.name(), "zone5", "zone4", "zone1",
+            "zone3", "zone2", InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+            "instance0", "invalidInstance1", "invalidInstance1"),
+        }
+    };
+  }
+
+  @Test
+  public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1",
+        "instance2", "instance3", "instance4", "instance5", "invalidInstance",
+        InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1",
+        InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1");
+
+    Response response = new JerseyUriRequestBuilder(
+        "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance4") && stoppableSet.contains("instance3"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    //  "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}.
+    //  Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable.
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test
+  public void testInstanceStoppableZoneBasedWithoutZoneOrder() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1",
+        "instance2", "instance3", "instance4", "invalidInstance",
+        InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(),
+        "instance7", "instance9", "instance10");
+
+    Response response = new JerseyUriRequestBuilder(
+        "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    // Without zone order, helix should pick the zone1 because it has higher instance count than zone2.
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance0") && stoppableSet.contains("instance1"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dataProvider = "generatePayloadCrossZoneStoppableCheckWithZoneOrder")
+  public void testCrossZoneStoppableWithZoneOrder(String content) throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    Response response = new JerseyUriRequestBuilder(
+        "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12")
+        && stoppableSet.contains("instance11") && stoppableSet.contains("instance10"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
   @Test
+  public void testCrossZoneStoppableWithoutZoneOrder() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+    String content = String.format(
+        "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"],"
+            + "\"%s\":[\"%s\", \"%s\", \"%s\"]}",
+        InstancesAccessor.InstancesProperties.selection_base.name(),
+        InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(),
+        InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3",
+        "instance6", "instance9", "instance10", "instance11", "instance12", "instance13",
+        "instance14", "invalidInstance",
+        InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0",
+        "invalidInstance1", "invalidInstance1");
+
+    Response response = new JerseyUriRequestBuilder(
+        "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
+        STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));
+
+    Set<String> stoppableSet = getStringSet(jsonNode,
+        InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
+    Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12")
+        && stoppableSet.contains("instance11") && stoppableSet.contains("instance10"));
+
+    JsonNode nonStoppableInstances = jsonNode.get(
+        InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"),
+        ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
+    Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
+        ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+
+  @Test(dependsOnMethods = "testInstanceStoppableZoneBasedWithToBeStoppedInstances")
   public void testInstanceStoppable_zoneBased_zoneOrder() throws IOException {
     System.out.println("Start test :" + TestHelper.getTestMethodName());
     // Select instances with zone based
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
index 35e6399e0..e37da34ff 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java
@@ -22,8 +22,10 @@ package org.apache.helix.rest.server.util;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -103,6 +105,52 @@ public class TestInstanceValidationUtilInRest{
     Assert.assertEquals(failedPartitions.keySet().size(), 2);
   }
 
+  @Test
+  public void testPartitionLevelCheckWithToBeStoppedNode() {
+    List<ExternalView> externalViews = new ArrayList<>(Arrays.asList(prepareExternalViewOnline()));
+    Mock mock = new Mock();
+    HelixDataAccessor accessor = mock.dataAccessor;
+
+    when(mock.dataAccessor.keyBuilder())
+        .thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+    when(mock.dataAccessor
+        .getProperty(new PropertyKey.Builder(TEST_CLUSTER).stateModelDef(MasterSlaveSMD.name)))
+        .thenReturn(mock.stateModel);
+    when(mock.stateModel.getTopState()).thenReturn("MASTER");
+    when(mock.stateModel.getInitialState()).thenReturn("OFFLINE");
+
+    Map<String, Map<String, Boolean>> partitionStateMap = new HashMap<>();
+    partitionStateMap.put("h1", new HashMap<>());
+    partitionStateMap.put("h2", new HashMap<>());
+    partitionStateMap.put("h3", new HashMap<>());
+    partitionStateMap.put("h4", new HashMap<>());
+
+    partitionStateMap.get("h1").put("p1", true);
+    partitionStateMap.get("h2").put("p1", true);
+    partitionStateMap.get("h3").put("p1", true);
+    partitionStateMap.get("h4").put("p1", true);
+
+    partitionStateMap.get("h1").put("p2", true);
+    partitionStateMap.get("h2").put("p2", false);
+    partitionStateMap.get("h3").put("p2", true);
+
+    Set<String> toBeStoppedInstances = new HashSet<>();
+    toBeStoppedInstances.add("h3");
+    Map<String, List<String>> failedPartitions = InstanceValidationUtil.perPartitionHealthCheck(
+        externalViews, partitionStateMap, "h1", accessor, toBeStoppedInstances);
+    Assert.assertEquals(failedPartitions.keySet().size(), 1);
+    Assert.assertEquals(failedPartitions.get("p2").size(), 1);
+    Assert.assertTrue(failedPartitions.get("p2").contains("UNHEALTHY_PARTITION"));
+
+    toBeStoppedInstances.remove("h3");
+    toBeStoppedInstances.add("h2");
+    failedPartitions =
+        InstanceValidationUtil.perPartitionHealthCheck(externalViews, partitionStateMap, "h1",
+            accessor, toBeStoppedInstances);
+    // Since we presume h2 as being already stopped, the health status of p2 on h2 will be skipped.
+    Assert.assertEquals(failedPartitions.keySet().size(), 0);
+  }
+
   private ExternalView prepareExternalView() {
     ExternalView externalView = new ExternalView(RESOURCE_NAME);
     externalView.getRecord()
@@ -163,6 +211,22 @@ public class TestInstanceValidationUtilInRest{
     return externalView;
   }
 
+  private ExternalView prepareExternalViewOnline() {
+    ExternalView externalView = new ExternalView(RESOURCE_NAME);
+    externalView.getRecord()
+        .setSimpleField(ExternalView.ExternalViewProperty.STATE_MODEL_DEF_REF.toString(),
+            MasterSlaveSMD.name);
+    externalView.setState("p1", "h1", "MASTER");
+    externalView.setState("p1", "h2", "SLAVE");
+    externalView.setState("p1", "h3", "SLAVE");
+
+    externalView.setState("p2", "h1", "MASTER");
+    externalView.setState("p2", "h2", "SLAVE");
+    externalView.setState("p2", "h3", "SLAVE");
+
+    return externalView;
+  }
+
   private final class Mock {
     private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
     private StateModelDefinition stateModel = mock(StateModelDefinition.class);