You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/12/08 02:11:06 UTC

(helix) 07/09: Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache. (#2702)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a24ed72b249814722e6ad2ebe86b28bf13d3e358
Author: Zachary Pinto <za...@linkedin.com>
AuthorDate: Tue Nov 28 11:06:02 2023 -0800

    Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache. (#2702)
    
    Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache.
---
 .../changedetector/ResourceChangeDetector.java     |   4 +-
 .../changedetector/ResourceChangeSnapshot.java     |  41 ++-
 .../dataproviders/BaseControllerDataProvider.java  | 365 +++++++++++++++------
 .../WorkflowControllerDataProvider.java            |   3 +-
 .../controller/rebalancer/AbstractRebalancer.java  |  11 +-
 .../controller/rebalancer/AutoRebalancer.java      |  34 +-
 .../controller/rebalancer/CustomRebalancer.java    |   4 +-
 .../rebalancer/DelayedAutoRebalancer.java          |  75 ++---
 .../AbstractEvenDistributionRebalanceStrategy.java |   5 +-
 .../strategy/ConstraintRebalanceStrategy.java      |   8 +-
 .../strategy/CrushRebalanceStrategy.java           |   2 +-
 .../strategy/MultiRoundCrushRebalanceStrategy.java |   2 +-
 .../rebalancer/util/DelayedRebalanceUtil.java      |  94 +-----
 .../rebalancer/waged/GlobalRebalanceRunner.java    |  23 +-
 .../rebalancer/waged/WagedRebalancer.java          |  61 +---
 .../AbstractPartitionMovementConstraint.java       |   6 +-
 .../constraints/BaselineInfluenceConstraint.java   |   2 +-
 .../constraints/ConstraintBasedAlgorithm.java      |  10 +-
 .../constraints/PartitionMovementConstraint.java   |   6 +-
 .../waged/model/ClusterModelProvider.java          | 107 ++++--
 .../stages/BestPossibleStateCalcStage.java         |  65 +++-
 .../stages/IntermediateStateCalcStage.java         |  10 +-
 .../stages/MaintenanceRecoveryStage.java           |   2 +-
 .../controller/stages/MessageGenerationPhase.java  |   3 +-
 .../controller/stages/ReadClusterDataStage.java    |   2 -
 .../stages/ResourceComputationStage.java           |   2 -
 .../stages/task/TaskSchedulingStage.java           |   1 -
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  14 +-
 .../org/apache/helix/model/ResourceAssignment.java |   1 -
 .../apache/helix/task/AbstractTaskDispatcher.java  |   3 +-
 .../java/org/apache/helix/task/JobDispatcher.java  |   6 +-
 .../helix/tools/ClusterExternalViewVerifier.java   |   2 +-
 .../StrictMatchExternalViewVerifier.java           |  12 +-
 .../main/java/org/apache/helix/util/HelixUtil.java |   6 +-
 .../trimmer/TestHelixPropoertyTimmer.java          |   4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |   2 +-
 .../rebalancer/waged/TestWagedRebalancer.java      |  43 ++-
 .../waged/TestWagedRebalancerMetrics.java          |   9 +-
 .../TestPartitionMovementConstraint.java           |   5 +
 .../waged/model/AbstractTestClusterModel.java      |   5 +-
 .../waged/model/ClusterModelTestHelper.java        |   2 +-
 .../rebalancer/waged/model/TestAssignableNode.java |  22 +-
 .../waged/model/TestClusterModelProvider.java      |  31 +-
 .../TestBestPossibleCalcStageCompatibility.java    |   3 +-
 .../stages/TestBestPossibleStateCalcStage.java     |   1 +
 .../stages/TestCancellationMessageGeneration.java  |   1 -
 .../stages/TestIntermediateStateCalcStage.java     |   1 +
 .../controller/stages/TestRebalancePipeline.java   |  26 +-
 .../stages/TestReplicaLevelThrottling.java         |   2 +-
 ...estOfflineNodeTimeoutDuringMaintenanceMode.java |  12 +-
 .../messaging/TestP2PMessageSemiAuto.java          |   2 +-
 .../messaging/TestP2PNoDuplicatedMessage.java      |   4 +-
 .../integration/rebalancer/TestAutoRebalance.java  |   6 +-
 .../TestAutoRebalancePartitionLimit.java           |   2 +-
 .../rebalancer/TestCustomRebalancer.java           |   2 +-
 .../TestCustomizedIdealStateRebalancer.java        |   8 +-
 .../rebalancer/TestInstanceOperation.java          |  54 +--
 .../WagedRebalancer/TestWagedNodeSwap.java         |   2 +-
 .../messaging/p2pMessage/TestP2PMessages.java      |   8 +-
 .../helix/task/TestTargetedTaskStateChange.java    |   8 +-
 .../helix/rest/server/AbstractTestClass.java       |   2 +-
 .../helix/rest/server/TestInstancesAccessor.java   |   2 +-
 62 files changed, 757 insertions(+), 504 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index ae8253ffb..4e0b706b8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -113,13 +113,13 @@ public class ResourceChangeDetector implements ChangeDetector {
       HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
     switch (changeType) {
     case INSTANCE_CONFIG:
-      return snapshot.getInstanceConfigMap();
+      return snapshot.getAssignableInstanceConfigMap();
     case IDEAL_STATE:
       return snapshot.getIdealStateMap();
     case RESOURCE_CONFIG:
       return snapshot.getResourceConfigMap();
     case LIVE_INSTANCE:
-      return snapshot.getLiveInstances();
+      return snapshot.getAssignableLiveInstances();
     case CLUSTER_CONFIG:
       ClusterConfig config = snapshot.getClusterConfig();
       if (config == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
index 39fccd8d0..f37cae635 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -50,10 +50,12 @@ import org.apache.helix.model.ResourceConfig;
 class ResourceChangeSnapshot {
 
   private Set<HelixConstants.ChangeType> _changedTypes;
-  private Map<String, InstanceConfig> _instanceConfigMap;
+  private Map<String, InstanceConfig> _allInstanceConfigMap;
+  private Map<String, InstanceConfig> _assignableInstanceConfigMap;
   private Map<String, IdealState> _idealStateMap;
   private Map<String, ResourceConfig> _resourceConfigMap;
-  private Map<String, LiveInstance> _liveInstances;
+  private Map<String, LiveInstance> _allLiveInstances;
+  private Map<String, LiveInstance> _assignableLiveInstances;
   private ClusterConfig _clusterConfig;
 
   /**
@@ -61,10 +63,12 @@ class ResourceChangeSnapshot {
    */
   ResourceChangeSnapshot() {
     _changedTypes = new HashSet<>();
-    _instanceConfigMap = new HashMap<>();
+    _allInstanceConfigMap = new HashMap<>();
+    _assignableInstanceConfigMap = new HashMap<>();
     _idealStateMap = new HashMap<>();
     _resourceConfigMap = new HashMap<>();
-    _liveInstances = new HashMap<>();
+    _allLiveInstances = new HashMap<>();
+    _assignableLiveInstances = new HashMap<>();
     _clusterConfig = null;
   }
 
@@ -80,12 +84,16 @@ class ResourceChangeSnapshot {
   ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
       boolean ignoreNonTopologyChange) {
     _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
-
-    _instanceConfigMap = ignoreNonTopologyChange ?
+    _allInstanceConfigMap = ignoreNonTopologyChange ?
         dataProvider.getInstanceConfigMap().entrySet().parallelStream().collect(Collectors
             .toMap(e -> e.getKey(),
                 e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
         new HashMap<>(dataProvider.getInstanceConfigMap());
+    _assignableInstanceConfigMap = ignoreNonTopologyChange ?
+        dataProvider.getAssignableInstanceConfigMap().entrySet().parallelStream().collect(Collectors
+            .toMap(e -> e.getKey(),
+                e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
+        new HashMap<>(dataProvider.getAssignableInstanceConfigMap());
     _idealStateMap = ignoreNonTopologyChange ?
         dataProvider.getIdealStates().entrySet().parallelStream().collect(Collectors
             .toMap(e -> e.getKey(),
@@ -99,7 +107,8 @@ class ResourceChangeSnapshot {
     _clusterConfig = ignoreNonTopologyChange ?
         ClusterConfigTrimmer.getInstance().trimProperty(dataProvider.getClusterConfig()) :
         dataProvider.getClusterConfig();
-    _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
+    _allLiveInstances = new HashMap<>(dataProvider.getLiveInstances());
+    _assignableLiveInstances = new HashMap<>(dataProvider.getAssignableLiveInstances());
   }
 
   /**
@@ -108,10 +117,12 @@ class ResourceChangeSnapshot {
    */
   ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
     _changedTypes = new HashSet<>(snapshot._changedTypes);
-    _instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
+    _allInstanceConfigMap = new HashMap<>(snapshot._allInstanceConfigMap);
+    _assignableInstanceConfigMap = new HashMap<>(snapshot._assignableInstanceConfigMap);
     _idealStateMap = new HashMap<>(snapshot._idealStateMap);
     _resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
-    _liveInstances = new HashMap<>(snapshot._liveInstances);
+    _allLiveInstances = new HashMap<>(snapshot._allLiveInstances);
+    _assignableLiveInstances = new HashMap<>(snapshot._assignableLiveInstances);
     _clusterConfig = snapshot._clusterConfig;
   }
 
@@ -120,7 +131,11 @@ class ResourceChangeSnapshot {
   }
 
   Map<String, InstanceConfig> getInstanceConfigMap() {
-    return _instanceConfigMap;
+    return _allInstanceConfigMap;
+  }
+
+  Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
+    return _assignableInstanceConfigMap;
   }
 
   Map<String, IdealState> getIdealStateMap() {
@@ -132,7 +147,11 @@ class ResourceChangeSnapshot {
   }
 
   Map<String, LiveInstance> getLiveInstances() {
-    return _liveInstances;
+    return _allLiveInstances;
+  }
+
+  Map<String, LiveInstance> getAssignableLiveInstances() {
+    return _assignableLiveInstances;
   }
 
   ClusterConfig getClusterConfig() {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9dd517384..9120bd962 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -102,8 +102,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   // Property caches
   private final PropertyCache<ResourceConfig> _resourceConfigCache;
-  private final PropertyCache<InstanceConfig> _instanceConfigCache;
-  private final PropertyCache<LiveInstance> _liveInstanceCache;
+  private final PropertyCache<InstanceConfig> _allInstanceConfigCache;
+  private final PropertyCache<LiveInstance> _allLiveInstanceCache;
   private final PropertyCache<IdealState> _idealStateCache;
   private final PropertyCache<ClusterConstraints> _clusterConstraintsCache;
   private final PropertyCache<StateModelDefinition> _stateModelDefinitionCache;
@@ -118,6 +118,12 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private final Set<String> _disabledInstanceSet = new HashSet<>();
+
+  // Assignable instances are instances will contain at most one instance with a given logicalId.
+  // This is used for SWAP related operations where there can be two instances with the same logicalId.
+  private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new HashMap<>();
+  private final Map<String, LiveInstance> _assignableLiveInstancesMap = new HashMap<>();
+  private final Set<String> _assignableDisabledInstanceSet = new HashSet<>();
   private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>();
   private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
   private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
@@ -154,7 +160,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
         return obj.getResourceName();
       }
     }, true);
-    _liveInstanceCache = new PropertyCache<>(this, "LiveInstance", new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
+    _allLiveInstanceCache = new PropertyCache<>(this, "LiveInstance", new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
       @Override
       public PropertyKey getRootKey(HelixDataAccessor accessor) {
         return accessor.keyBuilder().liveInstances();
@@ -170,7 +176,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
         return obj.getInstanceName();
       }
     }, true);
-    _instanceConfigCache = new PropertyCache<>(this, "InstanceConfig", new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
+    _allInstanceConfigCache = new PropertyCache<>(this, "InstanceConfig", new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
       @Override
       public PropertyKey getRootKey(HelixDataAccessor accessor) {
         return accessor.keyBuilder().instanceConfigs();
@@ -310,7 +316,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private void refreshLiveInstances(final HelixDataAccessor accessor,
       Set<HelixConstants.ChangeType> refreshedType) {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE).getAndSet(false)) {
-      _liveInstanceCache.refresh(accessor);
+      _allLiveInstanceCache.refresh(accessor);
       _updateInstanceOfflineTime = true;
       refreshedType.add(HelixConstants.ChangeType.LIVE_INSTANCE);
     } else {
@@ -323,10 +329,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private void refreshInstanceConfigs(final HelixDataAccessor accessor,
       Set<HelixConstants.ChangeType> refreshedType) {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG).getAndSet(false)) {
-      _instanceConfigCache.refresh(accessor);
+      _allInstanceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: %s", _clusterName,
-              getPipelineName(), _instanceConfigCache.getPropertyMap().keySet()));
+              getPipelineName(), _allInstanceConfigCache.getPropertyMap().keySet()));
       refreshedType.add(HelixConstants.ChangeType.INSTANCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
@@ -335,6 +341,108 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     }
   }
 
+  /**
+   * Refreshes the assignable instances and SWAP related caches. This should be called after
+   * liveInstance and instanceConfig caches are refreshed. To determine what instances are
+   * assignable and live, it takes a combination of both the all instanceConfigs and liveInstances.
+   * TODO: Add EVACUATE InstanceOperation to be filtered out in assignable nodes.
+   *
+   * @param instanceConfigMap InstanceConfig map from instanceConfig cache
+   * @param liveInstancesMap  LiveInstance map from liveInstance cache
+   * @param clusterConfig     ClusterConfig from clusterConfig cache
+   */
+  private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
+      Map<String, LiveInstance> liveInstancesMap, ClusterConfig clusterConfig) {
+
+    if (clusterConfig == null) {
+      logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
+      return;
+    }
+
+    ClusterTopologyConfig clusterTopologyConfig =
+        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
+
+    // Clear all caches
+    _assignableInstanceConfigMap.clear();
+    _assignableLiveInstancesMap.clear();
+    _swapOutInstanceNameToSwapInInstanceName.clear();
+    _enabledLiveSwapInInstanceNames.clear();
+
+    Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
+    Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
+    Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+
+    for (Map.Entry<String, InstanceConfig> entry : instanceConfigMap.entrySet()) {
+      String node = entry.getKey();
+      InstanceConfig currentInstanceConfig = entry.getValue();
+
+      if (currentInstanceConfig == null) {
+        continue;
+      }
+
+      String currentInstanceLogicalId =
+          currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
+
+      // Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
+      // InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
+      // assignable. If there are duplicates with one node having no InstanceOperation and the other
+      // having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
+      // completion, therefore making the node assignable.
+      if (filteredInstancesByLogicalId.containsKey(currentInstanceLogicalId)) {
+        String filteredNode = filteredInstancesByLogicalId.get(currentInstanceLogicalId);
+        InstanceConfig filteredDuplicateInstanceConfig = instanceConfigMap.get(filteredNode);
+
+        if ((filteredDuplicateInstanceConfig.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
+            && currentInstanceConfig.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
+            || currentInstanceConfig.getInstanceOperation().isEmpty()) {
+          // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
+          // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
+          // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
+          // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
+          _assignableInstanceConfigMap.remove(filteredNode);
+          _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+          filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
+        }
+      } else {
+        _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+        filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
+      }
+
+      if (currentInstanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
+        swapOutLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
+            currentInstanceLogicalId);
+      }
+
+      if (currentInstanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+        swapInInstancesByLogicalId.put(
+            currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
+            currentInstanceConfig.getInstanceName());
+      }
+    }
+
+    liveInstancesMap.forEach((instanceName, liveInstance) -> {
+      if (_assignableInstanceConfigMap.containsKey(instanceName)) {
+        _assignableLiveInstancesMap.put(instanceName, liveInstance);
+      }
+    });
+
+    swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
+      String swapInInstanceName = swapInInstancesByLogicalId.get(value);
+      if (swapInInstanceName != null) {
+        _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
+        if (liveInstancesMap.containsKey(swapInInstanceName)
+            && InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
+            clusterConfig)) {
+          _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
+        }
+      }
+    });
+  }
+
   private void refreshResourceConfig(final HelixDataAccessor accessor,
       Set<HelixConstants.ChangeType> refreshedType) {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG).getAndSet(false)) {
@@ -373,7 +481,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
       timeOutWindow = clusterConfig.getOfflineNodeTimeOutForMaintenanceMode();
     }
     if (timeOutWindow >= 0 && isMaintenanceModeEnabled) {
-      for (String instance : _liveInstanceCache.getPropertyMap().keySet()) {
+      for (String instance : _assignableLiveInstancesMap.keySet()) {
         // 1. Check timed-out cache and don't do repeated work;
         // 2. Check for nodes that didn't exist in the last iteration, because it has been checked;
         // 3. For all other nodes, check if it's timed-out.
@@ -386,9 +494,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
       }
     }
     if (isMaintenanceModeEnabled) {
-      _liveInstanceExcludeTimedOutForMaintenance =
-          _liveInstanceCache.getPropertyMap().entrySet().stream()
-              .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
+      _liveInstanceExcludeTimedOutForMaintenance = _assignableLiveInstancesMap.entrySet().stream()
+          .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey()))
               .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     }
   }
@@ -421,6 +528,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     refreshIdealState(accessor, refreshedTypes);
     refreshLiveInstances(accessor, refreshedTypes);
     refreshInstanceConfigs(accessor, refreshedTypes);
+    updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+        _clusterConfig);
     refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
@@ -431,17 +540,19 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     updateOfflineInstanceHistory(accessor);
 
     // Refresh derived data
-    _instanceMessagesCache.refresh(accessor, _liveInstanceCache.getPropertyMap());
-    _currentStateCache.refresh(accessor, _liveInstanceCache.getPropertyMap());
+    // Must use _liveInstanceCache instead of _assignableLiveInstancesMap because we need to
+    // know about the messages and current state of all instances including the SWAP_IN ones.
+    _instanceMessagesCache.refresh(accessor, _allLiveInstanceCache.getPropertyMap());
+    _currentStateCache.refresh(accessor, _allLiveInstanceCache.getPropertyMap());
 
     // current state must be refreshed before refreshing relay messages
     // because we need to use current state to validate all relay messages.
-    _instanceMessagesCache.updateRelayMessages(_liveInstanceCache.getPropertyMap(),
+    _instanceMessagesCache.updateRelayMessages(_allLiveInstanceCache.getPropertyMap(),
         _currentStateCache.getParticipantStatesMap());
 
     updateIdealRuleMap(getClusterConfig());
-    updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig());
-    updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
+    updateDisabledInstances(getInstanceConfigMap().values(),
+        getAssignableInstanceConfigMap().values(),
         getClusterConfig());
 
     return refreshedTypes;
@@ -453,17 +564,18 @@ public class BaseControllerDataProvider implements ControlContextProvider {
           "# of StateModelDefinition read from zk: " + getStateModelDefMap().size());
       LogUtil.logDebug(logger, getClusterEventId(),
           "# of ConstraintMap read from zk: " + getConstraintMap().size());
-      LogUtil
-          .logDebug(logger, getClusterEventId(), "LiveInstances: " + getLiveInstances().keySet());
-      for (LiveInstance instance : getLiveInstances().values()) {
+      LogUtil.logDebug(logger, getClusterEventId(),
+          "AssignableLiveInstances: " + getAssignableLiveInstances().keySet());
+      for (LiveInstance instance : getAssignableLiveInstances().values()) {
         LogUtil.logDebug(logger, getClusterEventId(),
-            "live instance: " + instance.getInstanceName() + " " + instance.getEphemeralOwner());
+            "assignable live instance: " + instance.getInstanceName() + " "
+                + instance.getEphemeralOwner());
       }
       LogUtil.logDebug(logger, getClusterEventId(), "IdealStates: " + getIdealStates().keySet());
       LogUtil.logDebug(logger, getClusterEventId(),
           "ResourceConfigs: " + getResourceConfigMap().keySet());
       LogUtil.logDebug(logger, getClusterEventId(),
-          "InstanceConfigs: " + getInstanceConfigMap().keySet());
+          "AssignableInstanceConfigs: " + getAssignableInstanceConfigMap().keySet());
       LogUtil.logDebug(logger, getClusterEventId(), "ClusterConfigs: " + getClusterConfig());
     }
   }
@@ -476,8 +588,10 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _clusterConfig = clusterConfig;
     refreshAbnormalStateResolverMap(_clusterConfig);
     updateIdealRuleMap(_clusterConfig);
-    updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
-    updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
+    updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+        _clusterConfig);
+    updateDisabledInstances(getInstanceConfigMap().values(),
+        getAssignableInstanceConfigMap().values(),
         _clusterConfig);
   }
 
@@ -527,23 +641,57 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   /**
-   * Returns the LiveInstances for each of the instances that are currently up and running,
+   * Returns the assignable LiveInstances for each of the instances that are currently up and running,
    * excluding the instances that are considered offline during maintenance mode. Instances
    * are timed-out if they have been offline for a while before going live during maintenance mode.
+   * @return A map of LiveInstances to their instance names
    */
-  public Map<String, LiveInstance> getLiveInstances() {
+  public Map<String, LiveInstance> getAssignableLiveInstances() {
     if (isMaintenanceModeEnabled()) {
-      return _liveInstanceExcludeTimedOutForMaintenance;
+      return Collections.unmodifiableMap(_liveInstanceExcludeTimedOutForMaintenance);
     }
 
-    return _liveInstanceCache.getPropertyMap();
+    return Collections.unmodifiableMap(_assignableLiveInstancesMap);
+  }
+
+  /**
+   * Returns the LiveInstances for each of the instances that are currently up and running,
+   * excluding the instances that are considered offline during maintenance mode. Instances are
+   * timed-out if they have been offline for a while before going live during maintenance mode.
+   *
+   * @return A map of LiveInstances to their instance names
+   */
+  public Map<String, LiveInstance> getLiveInstances() {
+    return _allLiveInstanceCache.getPropertyMap();
+  }
+
+  /**
+   * Return the set of all assignable instances names.
+   *
+   * @return A new set contains instance name
+   */
+  public Set<String> getAssignableInstances() {
+    return _assignableInstanceConfigMap.keySet();
   }
 
   /**
    * Return the set of all instances names.
+   * @return A new set contains instance name
    */
   public Set<String> getAllInstances() {
-    return _instanceConfigCache.getPropertyMap().keySet();
+    return _allInstanceConfigCache.getPropertyMap().keySet();
+  }
+
+  /**
+   * Return all the live nodes that are enabled and assignable
+   *
+   * @return A new set contains live instance name and that are marked enabled
+   */
+  public Set<String> getAssignableEnabledLiveInstances() {
+    Set<String> enabledLiveInstances = new HashSet<>(getAssignableLiveInstances().keySet());
+    enabledLiveInstances.removeAll(getDisabledInstances());
+
+    return enabledLiveInstances;
   }
 
   /**
@@ -552,22 +700,50 @@ public class BaseControllerDataProvider implements ControlContextProvider {
    */
   public Set<String> getEnabledLiveInstances() {
     Set<String> enabledLiveInstances = new HashSet<>(getLiveInstances().keySet());
-    enabledLiveInstances.removeAll(getDisabledInstances());
+    enabledLiveInstances.removeAll(getAssignableDisabledInstances());
 
     return enabledLiveInstances;
   }
 
+  /**
+   * Return all nodes that are enabled and assignable.
+   *
+   * @return A new set contains instance name and that are marked enabled
+   */
+  public Set<String> getAssignableEnabledInstances() {
+    Set<String> enabledNodes = new HashSet<>(getAssignableInstances());
+    enabledNodes.removeAll(getDisabledInstances());
+
+    return enabledNodes;
+  }
+
   /**
    * Return all nodes that are enabled.
-   * @return
+   * @return A new set contains instance name and that are marked enabled
    */
   public Set<String> getEnabledInstances() {
     Set<String> enabledNodes = new HashSet<>(getAllInstances());
-    enabledNodes.removeAll(getDisabledInstances());
+    enabledNodes.removeAll(getAssignableDisabledInstances());
 
     return enabledNodes;
   }
 
+  /**
+   * Return all the live nodes that are enabled and assignable and tagged with given instanceTag.
+   *
+   * @param instanceTag The instance group tag.
+   * @return A new set contains live instance name and that are marked enabled and have the
+   * specified tag.
+   */
+  public Set<String> getAssignableEnabledLiveInstancesWithTag(String instanceTag) {
+    Set<String> enabledLiveInstancesWithTag = new HashSet<>(getAssignableLiveInstances().keySet());
+    Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
+    enabledLiveInstancesWithTag.retainAll(instancesWithTag);
+    enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
+
+    return enabledLiveInstancesWithTag;
+  }
+
   /**
    * Return all the live nodes that are enabled and tagged with given instanceTag.
    * @param instanceTag The instance group tag.
@@ -576,21 +752,38 @@ public class BaseControllerDataProvider implements ControlContextProvider {
    */
   public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
     Set<String> enabledLiveInstancesWithTag = new HashSet<>(getLiveInstances().keySet());
-    Set<String> instancesWithTag = getInstancesWithTag(instanceTag);
+    Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
     enabledLiveInstancesWithTag.retainAll(instancesWithTag);
     enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
 
     return enabledLiveInstancesWithTag;
   }
 
+  /**
+   * Return all the nodes that are assignable and tagged with given instance tag.
+   *
+   * @param instanceTag The instance group tag.
+   */
+  public Set<String> getAssignableInstancesWithTag(String instanceTag) {
+    Set<String> taggedInstances = new HashSet<>();
+    for (String instance : _assignableInstanceConfigMap.keySet()) {
+      InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance);
+      if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
+        taggedInstances.add(instance);
+      }
+    }
+
+    return taggedInstances;
+  }
+
   /**
    * Return all the nodes that are tagged with given instance tag.
    * @param instanceTag The instance group tag.
    */
   public Set<String> getInstancesWithTag(String instanceTag) {
     Set<String> taggedInstances = new HashSet<>();
-    for (String instance : _instanceConfigCache.getPropertyMap().keySet()) {
-      InstanceConfig instanceConfig = _instanceConfigCache.getPropertyByName(instance);
+    for (String instance : _assignableInstanceConfigMap.keySet()) {
+      InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance);
       if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
         taggedInstances.add(instance);
       }
@@ -625,6 +818,15 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     return Collections.unmodifiableSet(_disabledInstanceSet);
   }
 
+  /**
+   * This method allows one to fetch the set of nodes that are disabled
+   *
+   * @return
+   */
+  public Set<String> getAssignableDisabledInstances() {
+    return Collections.unmodifiableSet(_assignableDisabledInstanceSet);
+  }
+
   /**
    * Get all swapping instance pairs.
    *
@@ -644,7 +846,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    _liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
+    _allLiveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
+    updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+        _clusterConfig);
     _updateInstanceOfflineTime = true;
   }
 
@@ -762,11 +966,20 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   /**
-   * Returns the instance config map
-   * @return
+   * Returns the instance config map for all assignable instances.
+   *
+   * @return a map of instance name to instance config
+   */
+  public Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
+    return Collections.unmodifiableMap(_assignableInstanceConfigMap);
+  }
+
+  /**
+   * Returns the instance config map for all assignable instances.
+   * @return a map of instance name to instance config
    */
   public Map<String, InstanceConfig> getInstanceConfigMap() {
-    return _instanceConfigCache.getPropertyMap();
+    return _allInstanceConfigCache.getPropertyMap();
   }
 
   /**
@@ -774,9 +987,11 @@ public class BaseControllerDataProvider implements ControlContextProvider {
    * @param instanceConfigMap
    */
   public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
-    _instanceConfigCache.setPropertyMap(instanceConfigMap);
-    updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
-    updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(),
+    _allInstanceConfigCache.setPropertyMap(instanceConfigMap);
+    updateInstanceSets(_allInstanceConfigCache.getPropertyMap(), _allLiveInstanceCache.getPropertyMap(),
+        getClusterConfig());
+    updateDisabledInstances(getInstanceConfigMap().values(),
+        getAssignableInstanceConfigMap().values(),
         getClusterConfig());
   }
 
@@ -839,8 +1054,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (!_updateInstanceOfflineTime) {
       return;
     }
-    List<String> offlineNodes = new ArrayList<>(_instanceConfigCache.getPropertyMap().keySet());
-    offlineNodes.removeAll(_liveInstanceCache.getPropertyMap().keySet());
+    List<String> offlineNodes = new ArrayList<>(_allInstanceConfigCache.getPropertyMap().keySet());
+    offlineNodes.removeAll(_allLiveInstanceCache.getPropertyMap().keySet());
     _instanceOfflineTimeMap = new HashMap<>();
 
     for (String instance : offlineNodes) {
@@ -866,15 +1081,18 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _updateInstanceOfflineTime = false;
   }
 
-  private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
-      ClusterConfig clusterConfig) {
+  private void updateDisabledInstances(Collection<InstanceConfig> allInstanceConfigs,
+      Collection<InstanceConfig> assignableInstanceConfigs, ClusterConfig clusterConfig) {
     // Move the calculating disabled instances to refresh
     _disabledInstanceForPartitionMap.clear();
     _disabledInstanceSet.clear();
-    for (InstanceConfig config : instanceConfigs) {
+    for (InstanceConfig config : allInstanceConfigs) {
       Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
       if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
         _disabledInstanceSet.add(config.getInstanceName());
+        if (assignableInstanceConfigs.contains(config)) {
+          _assignableDisabledInstanceSet.add(config.getInstanceName());
+        }
       }
       for (String resource : disabledPartitionMap.keySet()) {
         _disabledInstanceForPartitionMap.putIfAbsent(resource, new HashMap<>());
@@ -886,49 +1104,6 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     }
   }
 
-  private void updateSwappingInstances(Collection<InstanceConfig> instanceConfigs,
-      Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
-    _swapOutInstanceNameToSwapInInstanceName.clear();
-    _enabledLiveSwapInInstanceNames.clear();
-
-    if (clusterConfig == null) {
-      logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
-      return;
-    }
-
-    ClusterTopologyConfig clusterTopologyConfig =
-        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
-
-    Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
-    Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
-    instanceConfigs.forEach(instanceConfig -> {
-      if (instanceConfig == null) {
-        return;
-      }
-      if (instanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
-        swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
-            instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
-      }
-      if (instanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-        swapInInstancesByLogicalId.put(
-            instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
-            instanceConfig.getInstanceName());
-      }
-    });
-
-    swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
-      String swapInInstanceName = swapInInstancesByLogicalId.get(value);
-      if (swapInInstanceName != null) {
-        _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
-        if (liveEnabledInstances.contains(swapInInstanceName)) {
-          _enabledLiveSwapInInstanceNames.add(swapInInstanceName);
-        }
-      }
-    });
-  }
-
   /*
    * Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has
    * been offline for longer than the user defined timeout window.
@@ -1101,10 +1276,16 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   protected StringBuilder genCacheContentStringBuilder() {
     StringBuilder sb = new StringBuilder();
-    sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
+    sb.append(String.format("liveInstaceMap: %s", _allLiveInstanceCache.getPropertyMap()))
+        .append("\n");
+    sb.append(String.format("assignableLiveInstaceMap: %s", _assignableLiveInstancesMap))
+        .append("\n");
     sb.append(String.format("idealStateMap: %s", _idealStateCache.getPropertyMap())).append("\n");
     sb.append(String.format("stateModelDefMap: %s",  _stateModelDefinitionCache.getPropertyMap())).append("\n");
-    sb.append(String.format("instanceConfigMap: %s", _instanceConfigCache.getPropertyMap())).append("\n");
+    sb.append(String.format("instanceConfigMap: %s", _allInstanceConfigCache.getPropertyMap()))
+        .append("\n");
+    sb.append(String.format("assignableInstanceConfigMap: %s", _assignableInstanceConfigMap))
+        .append("\n");
     sb.append(String.format("resourceConfigMap: %s", _resourceConfigCache.getPropertyMap())).append("\n");
     sb.append(String.format("messageCache: %s", _instanceMessagesCache)).append("\n");
     sb.append(String.format("currentStateCache: %s", _currentStateCache)).append("\n");
@@ -1113,7 +1294,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   }
 
   protected PropertyCache<LiveInstance> getLiveInstanceCache() {
-    return _liveInstanceCache;
+    return _allLiveInstanceCache;
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 96894e82d..3a71e777b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -26,7 +26,6 @@ import java.util.Set;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.common.caches.TaskCurrentStateCache;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
@@ -164,7 +163,7 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider {
    */
   public void resetActiveTaskCount(CurrentStateOutput currentStateOutput) {
     // init participant map
-    for (String liveInstance : getLiveInstances().keySet()) {
+    for (String liveInstance : getAssignableLiveInstances().keySet()) {
       _participantActiveTaskCount.put(liveInstance, 0);
     }
     // Active task == init and running tasks
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 1d8cb5d6c..7a23b8f28 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -102,9 +102,10 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
       List<String> preferenceList = getPreferenceList(partition, idealState,
-          Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
+          Collections.unmodifiableSet(cache.getAssignableLiveInstances().keySet()));
       Map<String, String> bestStateForPartition =
-          computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
+          computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(),
+              stateModelDef,
               preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
               cache.getClusterConfig(), partition,
               cache.getAbnormalStateResolver(stateModelDefName), cache);
@@ -392,6 +393,12 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    * transition to the top-state, which could minimize the impact to the application's availability.
    * To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and
    * second-states with same priority and rely on the fact that Collections.sort() is stable.
+   * @param preferenceList List of instances the replica will be placed on
+   * @param stateModelDef State model definition
+   * @param currentStateMap Current state of each replica <instance: state>
+   * @param liveInstances Set of live instances
+   * @param disabledInstancesForPartition Set of disabled instances for the partition
+   * @param bestPossibleStateMap Output map of <instance: state> for the partition
    */
   private void assignStatesToInstances(final List<String> preferenceList,
       final StateModelDefinition stateModelDef, final Map<String, String> currentStateMap,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 78bbbba28..5c3ff3b9e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -72,15 +72,15 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
       LOG.error("State Model Definition null for resource: " + resourceName);
       throw new HelixException("State Model Definition null for resource: " + resourceName);
     }
-    Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
-    int replicas = currentIdealState.getReplicaCount(liveInstance.size());
+    Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances();
+    int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size());
 
     LinkedHashMap<String, Integer> stateCountMap = stateModelDef
-        .getStateCountMap(liveInstance.size(), replicas);
-    List<String> liveNodes = new ArrayList<>(liveInstance.keySet());
-    List<String> allNodes = new ArrayList<>(clusterData.getAllInstances());
-    allNodes.removeAll(clusterData.getDisabledInstances());
-    liveNodes.retainAll(allNodes);
+        .getStateCountMap(assignableLiveInstance.size(), replicas);
+    List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet());
+    List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances());
+    assignableNodes.removeAll(clusterData.getDisabledInstances());
+    assignableLiveNodes.retainAll(assignableNodes);
 
     Map<String, Map<String, String>> currentMapping =
         currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
@@ -89,11 +89,11 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
     Set<String> taggedNodes = new HashSet<String>();
     Set<String> taggedLiveNodes = new HashSet<String>();
     if (currentIdealState.getInstanceGroupTag() != null) {
-      for (String instanceName : allNodes) {
-        if (clusterData.getInstanceConfigMap().get(instanceName)
+      for (String instanceName : assignableNodes) {
+        if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
             .containsTag(currentIdealState.getInstanceGroupTag())) {
           taggedNodes.add(instanceName);
-          if (liveInstance.containsKey(instanceName)) {
+          if (assignableLiveInstance.containsKey(instanceName)) {
             taggedLiveNodes.add(instanceName);
           }
         }
@@ -114,25 +114,25 @@ public class AutoRebalancer extends AbstractRebalancer<ResourceControllerDataPro
         LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
             + " but no live participants have this tag");
       }
-      allNodes = new ArrayList<>(taggedNodes);
-      liveNodes = new ArrayList<>(taggedLiveNodes);
+      assignableNodes = new ArrayList<>(taggedNodes);
+      assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
     }
 
     // sort node lists to ensure consistent preferred assignments
-    Collections.sort(allNodes);
-    Collections.sort(liveNodes);
+    Collections.sort(assignableNodes);
+    Collections.sort(assignableLiveNodes);
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
     _rebalanceStrategy =
         getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
             stateCountMap, maxPartition);
     ZNRecord newMapping = _rebalanceStrategy
-        .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
+        .computePartitionAssignment(assignableNodes, assignableLiveNodes, currentMapping, clusterData);
 
     LOG.debug("currentMapping: {}", currentMapping);
     LOG.debug("stateCountMap: {}", stateCountMap);
-    LOG.debug("liveNodes: {}", liveNodes);
-    LOG.debug("allNodes: {}", allNodes);
+    LOG.debug("assignableLiveNodes: {}", assignableLiveNodes);
+    LOG.debug("assignableNodes: {}", assignableNodes);
     LOG.debug("maxPartition: {}", maxPartition);
     LOG.debug("newMapping: {}", newMapping);
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index 7ed2e70b0..939d94aed 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -127,7 +127,7 @@ public class CustomRebalancer extends AbstractRebalancer<ResourceControllerDataP
       return instanceStateMap;
     }
 
-    Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
+    Map<String, LiveInstance> assignableLiveInstancesMap = cache.getAssignableLiveInstances();
     for (String instance : idealStateMap.keySet()) {
       boolean notInErrorState = currentStateMap != null
           && !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
@@ -135,7 +135,7 @@ public class CustomRebalancer extends AbstractRebalancer<ResourceControllerDataP
 
       // Note: if instance is not live, the mapping for that instance will not show up in
       // BestPossibleMapping (and ExternalView)
-      if (liveInstancesMap.containsKey(instance) && notInErrorState) {
+      if (assignableLiveInstancesMap.containsKey(instance) && notInErrorState) {
         if (enabled) {
           instanceStateMap.put(instance, idealStateMap.get(instance));
         } else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 442ddfb02..78793ddd9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -95,51 +94,46 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       }
     }
 
-    Set<String> liveEnabledNodes;
-    Set<String> allNodes;
+    Set<String> assignableLiveEnabledNodes;
+    Set<String> assignableNodes;
 
     String instanceTag = currentIdealState.getInstanceGroupTag();
     if (instanceTag != null) {
-      liveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
-      allNodes = clusterData.getInstancesWithTag(instanceTag);
+      assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstancesWithTag(instanceTag);
+      assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
 
       if (LOG.isInfoEnabled()) {
         LOG.info(String.format(
             "Found the following participants with tag %s for %s: "
                 + "instances: %s, liveEnabledInstances: %s",
-            currentIdealState.getInstanceGroupTag(), resourceName, allNodes, liveEnabledNodes));
+            currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
       }
     } else {
-      liveEnabledNodes = clusterData.getEnabledLiveInstances();
-      allNodes = clusterData.getAllInstances();
+      assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstances();
+      assignableNodes = clusterData.getAssignableInstances();
     }
 
-    Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
-        ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
-        clusterData.getInstanceConfigMap(), allNodes);
-    // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
-    // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
-    liveEnabledNodes.retainAll(allNodesDeduped);
-
     long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
-    Set<String> activeNodes = DelayedRebalanceUtil
-        .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
-            clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-            clusterData.getInstanceConfigMap(), delay, clusterConfig);
+    Set<String> activeNodes =
+        DelayedRebalanceUtil.getActiveNodes(assignableNodes, currentIdealState, assignableLiveEnabledNodes,
+            clusterData.getInstanceOfflineTimeMap(),
+            clusterData.getAssignableLiveInstances().keySet(),
+            clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
     if (delayRebalanceEnabled) {
       Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
-      offlineOrDisabledInstances.removeAll(liveEnabledNodes);
+      offlineOrDisabledInstances.removeAll(assignableLiveEnabledNodes);
       DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
           offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
-          clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
+          clusterData.getAssignableLiveInstances().keySet(),
+          clusterData.getAssignableInstanceConfigMap(), delay,
           clusterConfig, _manager);
     }
 
-    if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
+    if (assignableNodes.isEmpty() || activeNodes.isEmpty()) {
       LOG.error(String.format(
           "No instances or active instances available for resource %s, "
-              + "allInstances: %s, liveInstances: %s, activeInstances: %s",
-          resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
+              + "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, assignableNodes,
+          assignableLiveEnabledNodes, activeNodes));
       return generateNewIdealState(resourceName, currentIdealState,
           emptyMapping(currentIdealState));
     }
@@ -165,14 +159,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
         getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
             stateCountMap, maxPartition);
 
-    List<String> allNodeList = new ArrayList<>(allNodesDeduped);
+    List<String> allNodeList = new ArrayList<>(assignableNodes);
 
     // TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
     // Eventually we want to have exclusive groups of instance for different instance tag.
     List<String> liveEnabledAssignableNodeList = new ArrayList<>(
         // We will not assign partitions to instances with EVACUATE InstanceOperation.
-        DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
-            liveEnabledNodes));
+        DelayedRebalanceUtil.filterOutEvacuatingInstances(
+            clusterData.getAssignableInstanceConfigMap(),
+            assignableLiveEnabledNodes));
     // sort node lists to ensure consistent preferred assignments
     Collections.sort(allNodeList);
     Collections.sort(liveEnabledAssignableNodeList);
@@ -194,29 +189,16 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
           _rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
               clusterData);
       finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
-          liveEnabledNodes, replicaCount, minActiveReplicas);
+          assignableLiveEnabledNodes, replicaCount, minActiveReplicas);
     }
 
     finalMapping.getListFields().putAll(userDefinedPreferenceList);
 
-    // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
-    Map<String, String> swapOutToSwapInInstancePairs =
-        clusterData.getSwapOutToSwapInInstancePairs();
-    // 2. Get all enabled and live SWAP_IN instances in the cluster.
-    Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
-    // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
-    // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
-    // swap occurring.
-    if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
-      DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
-          swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
-    }
-
     LOG.debug("currentMapping: {}", currentMapping);
     LOG.debug("stateCountMap: {}", stateCountMap);
-    LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
+    LOG.debug("assignableLiveEnabledNodes: {}", assignableLiveEnabledNodes);
     LOG.debug("activeNodes: {}", activeNodes);
-    LOG.debug("allNodes: {}", allNodesDeduped);
+    LOG.debug("assignableNodes: {}", assignableNodes);
     LOG.debug("maxPartition: {}", maxPartition);
     LOG.debug("newIdealMapping: {}", newIdealMapping);
     LOG.debug("finalMapping: {}", finalMapping);
@@ -274,14 +256,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       LOG.debug("Processing resource:" + resource.getResourceName());
     }
 
-    Set<String> allNodes = cache.getEnabledInstances();
-    Set<String> liveNodes = cache.getLiveInstances().keySet();
+    Set<String> allNodes = cache.getAssignableEnabledInstances();
+    Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();
 
     ClusterConfig clusterConfig = cache.getClusterConfig();
     long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
     Set<String> activeNodes = DelayedRebalanceUtil
         .getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
-            cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
+            cache.getAssignableLiveInstances().keySet(), cache.getAssignableInstanceConfigMap(),
+            delayTime,
             clusterConfig);
 
     String stateModelDefName = idealState.getStateModelDefRef();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index d4922b9d6..7750bd70b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -87,7 +87,7 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
       final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
       ResourceControllerDataProvider clusterData) {
     // validate the instance configs
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     if (instanceConfigMap == null || !instanceConfigMap.keySet().containsAll(allNodes)) {
       throw new HelixException(String.format("Config for instances %s is not found!",
               allNodes.removeAll(instanceConfigMap.keySet())));
@@ -116,7 +116,8 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
     if (!origPartitionMap.isEmpty()) {
       Map<String, List<Node>> finalPartitionMap = null;
       Topology allNodeTopo =
-              new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+          new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
+              clusterData.getClusterConfig());
       // Transform current assignment to instance->partitions map, and get total partitions
       Map<Node, List<String>> nodeToPartitionMap =
           convertPartitionMap(origPartitionMap, allNodeTopo);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
index b52ef9852..783af13f0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/ConstraintRebalanceStrategy.java
@@ -154,7 +154,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
     // Since instance weight will be replaced by constraint evaluation, record it in advance to avoid
     // overwriting.
     Map<String, Integer> instanceWeightRecords = new HashMap<>();
-    for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) {
+    for (InstanceConfig instanceConfig : clusterData.getAssignableInstanceConfigMap().values()) {
       if (instanceConfig.getWeight() != InstanceConfig.WEIGHT_NOT_SET) {
         instanceWeightRecords.put(instanceConfig.getInstanceName(), instanceConfig.getWeight());
       }
@@ -163,7 +163,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
     List<String> candidates = new ArrayList<>(allNodes);
     // Only calculate for configured nodes.
     // Remove all non-configured nodes.
-    candidates.retainAll(clusterData.getAllInstances());
+    candidates.retainAll(clusterData.getAssignableInstances());
 
     // For generating the IdealState ZNRecord
     Map<String, List<String>> preferenceList = new HashMap<>();
@@ -207,7 +207,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
 
     // recover the original weight
     for (String instanceName : instanceWeightRecords.keySet()) {
-      clusterData.getInstanceConfigMap().get(instanceName)
+      clusterData.getAssignableInstanceConfigMap().get(instanceName)
           .setWeight(instanceWeightRecords.get(instanceName));
     }
 
@@ -297,7 +297,7 @@ public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalan
     }
     // Limit the weight to be at least MIN_INSTANCE_WEIGHT
     for (int i = 0; i < instancePriority.length; i++) {
-      clusterData.getInstanceConfigMap().get(qualifiedNodes.get(i))
+      clusterData.getAssignableInstanceConfigMap().get(qualifiedNodes.get(i))
           .setWeight(instancePriority[i] - baseline + MIN_INSTANCE_WEIGHT);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index 38d47abbe..08bbaaffa 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -75,7 +75,7 @@ public class CrushRebalanceStrategy implements RebalanceStrategy<ResourceControl
   public ZNRecord computePartitionAssignment(final List<String> allNodes,
       final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
       ResourceControllerDataProvider clusterData) throws HelixException {
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     _clusterTopo =
         new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
     Node topNode = _clusterTopo.getRootNode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index 0a7f4b67b..a53257f3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -82,7 +82,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy<Resou
   public ZNRecord computePartitionAssignment(final List<String> allNodes,
       final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
       ResourceControllerDataProvider clusterData) throws HelixException {
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     _clusterTopo =
         new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
     Node root = _clusterTopo.getRootNode();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index c7066d053..f4fb26541 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -141,92 +141,6 @@ public class DelayedRebalanceUtil {
         .collect(Collectors.toSet());
   }
 
-  /**
-   * Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
-   * InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
-   * assignable. If there are duplicates with one node having no InstanceOperation and the other
-   * having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
-   * completion, therefore making the node assignable.
-   * TODO: Eventually when we refactor DataProvider to have getAssignableInstances() and
-   * TODO: getAssignableEnabledLiveInstances() this will not need to be called in the rebalancer.
-   * @param clusterTopologyConfig the cluster topology configuration
-   * @param instanceConfigMap     the map of instance name to corresponding InstanceConfig
-   * @param instances             the set of instances to filter out duplicate logicalIDs for
-   * @return the set of instances with duplicate logicalIDs filtered out, there will only be one
-   * instance per logicalID
-   */
-  public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
-      ClusterTopologyConfig clusterTopologyConfig, Map<String, InstanceConfig> instanceConfigMap,
-      Set<String> instances) {
-    Set<String> filteredNodes = new HashSet<>();
-    Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
-
-    instances.forEach(node -> {
-      InstanceConfig thisInstanceConfig = instanceConfigMap.get(node);
-      if (thisInstanceConfig == null) {
-        return;
-      }
-      String thisLogicalId =
-          thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
-
-      if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) {
-        InstanceConfig filteredDuplicateInstanceConfig =
-            instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId));
-        if ((filteredDuplicateInstanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
-            && thisInstanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
-            || thisInstanceConfig.getInstanceOperation().isEmpty()) {
-          // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
-          // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
-          // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
-          // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
-          filteredNodes.remove(filteredInstancesByLogicalId.get(thisLogicalId));
-          filteredNodes.add(node);
-          filteredInstancesByLogicalId.put(thisLogicalId, node);
-        }
-      } else {
-        filteredNodes.add(node);
-        filteredInstancesByLogicalId.put(thisLogicalId, node);
-      }
-    });
-
-    return filteredNodes;
-  }
-
-  /**
-   * Look through the provided mapping and add corresponding SWAP_IN node if a SWAP_OUT node exists
-   * in the partition's preference list.
-   *
-   * @param mapping                      the mapping to be updated (IdealState ZNRecord)
-   * @param swapOutToSwapInInstancePairs the map of SWAP_OUT to SWAP_IN instances
-   */
-  public static void addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(ZNRecord mapping,
-      Map<String, String> swapOutToSwapInInstancePairs, Set<String> enabledLiveSwapInInstances) {
-    Map<String, List<String>> preferenceListsByPartition = mapping.getListFields();
-    for (String partition : preferenceListsByPartition.keySet()) {
-      List<String> preferenceList = preferenceListsByPartition.get(partition);
-      if (preferenceList == null) {
-        continue;
-      }
-      List<String> newInstancesToAdd = new ArrayList<>();
-      for (String instanceName : preferenceList) {
-        if (swapOutToSwapInInstancePairs.containsKey(instanceName)
-            && enabledLiveSwapInInstances.contains(
-            swapOutToSwapInInstancePairs.get(instanceName))) {
-          String swapInInstanceName = swapOutToSwapInInstancePairs.get(instanceName);
-          if (!preferenceList.contains(swapInInstanceName) && !newInstancesToAdd.contains(
-              swapInInstanceName)) {
-            newInstancesToAdd.add(swapInInstanceName);
-          }
-        }
-      }
-      if (!newInstancesToAdd.isEmpty()) {
-        preferenceList.addAll(newInstancesToAdd);
-      }
-    }
-  }
-
   /**
    * Return the time when an offline or disabled instance should be treated as inactive. Return -1
    * if it is inactive now or forced to be rebalanced by an on-demand rebalance.
@@ -429,8 +343,8 @@ public class DelayedRebalanceUtil {
 
       // keep all current assignment and add to allocated replicas
       resourceAssignment.getMappedPartitions().forEach(partition ->
-          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
-              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+          resourceAssignment.getReplicaMap(partition).forEach((logicalId, state) ->
+              allocatedReplicas.computeIfAbsent(logicalId, key -> new HashSet<>())
                   .add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
                       partition.getPartitionName(), state, statePriorityMap.get(state)))));
       // only proceed for resource requiring delayed rebalance overwrites
@@ -505,7 +419,7 @@ public class DelayedRebalanceUtil {
       ResourceAssignment resourceAssignment) {
     String resourceName = resourceAssignment.getResourceName();
     IdealState currentIdealState = clusterData.getIdealState(resourceName);
-    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
     int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
     int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
         .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
@@ -526,7 +440,7 @@ public class DelayedRebalanceUtil {
 
   private static int getMinActiveReplica(ResourceControllerDataProvider clusterData, String resourceName) {
     IdealState currentIdealState = clusterData.getIdealState(resourceName);
-    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
     int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
     return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
         .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
index 6c199bc1b..d710425cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import com.google.common.collect.ImmutableSet;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -30,12 +31,12 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterTopologyConfig;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.metrics.MetricCollector;
@@ -111,6 +112,7 @@ class GlobalRebalanceRunner implements AutoCloseable {
     _changeDetector.updateSnapshots(clusterData);
     // Get all the changed items' information. Filter for the items that have content changed.
     final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
+    Set<String> allAssignableInstances = clusterData.getAssignableInstances();
 
     if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
       final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
@@ -120,8 +122,8 @@ class GlobalRebalanceRunner implements AutoCloseable {
           // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
           // be triggered again after baseline is finished.
           // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
-          doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
-              clusterChanges);
+          doGlobalRebalance(clusterData, resourceMap, allAssignableInstances, algorithm,
+              currentStateOutput, !waitForGlobalRebalance, clusterChanges);
         } catch (HelixRebalanceException e) {
           if (_asyncGlobalRebalanceEnabled) {
             _rebalanceFailureCount.increment(1L);
@@ -150,9 +152,11 @@ class GlobalRebalanceRunner implements AutoCloseable {
    * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
    *                                   so the new Baseline could be applied to cluster.
    */
-  private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+  private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap, Set<String> allAssignableInstances,
       RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
+      throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
     _baselineCalcCounter.increment(1L);
     _baselineCalcLatency.startMeasuringLatency();
@@ -166,14 +170,7 @@ class GlobalRebalanceRunner implements AutoCloseable {
     ClusterModel clusterModel;
     try {
       clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
-          // Dedupe and select correct node if there is more than one with the same logical id.
-          // We should be calculating a new baseline only after a swap operation is complete and the SWAP_IN node is selected
-          // by deduping. This ensures that adding the SWAP_IN node to the cluster does not cause new baseline to be calculated
-          // with both the SWAP_OUT and SWAP_IN node.
-          DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
-              ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
-              clusterData.getInstanceConfigMap(), clusterData.getAllInstances()),
-              clusterChanges, currentBaseline);
+          allAssignableInstances, clusterChanges, currentBaseline);
     } catch (Exception ex) {
       throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
           HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 8f71f4e6d..c3049ebbf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,7 +47,6 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
@@ -303,18 +302,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
 
-    Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
-        ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
-        clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
-    // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
-    // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
-    Set<String> liveEnabledNodesDeduped = clusterData.getEnabledLiveInstances();
-    liveEnabledNodesDeduped.retainAll(allNodesDeduped);
-
     Set<String> activeNodes =
-        DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, liveEnabledNodesDeduped,
-            clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-            clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+        DelayedRebalanceUtil.getActiveNodes(clusterData.getAssignableInstances(),
+            clusterData.getAssignableEnabledLiveInstances(),
+            clusterData.getInstanceOfflineTimeMap(),
+            clusterData.getAssignableLiveInstances().keySet(),
+            clusterData.getAssignableInstanceConfigMap(), clusterData.getClusterConfig());
 
     // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
     delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
@@ -369,19 +362,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         newIdealState.setPreferenceLists(
             getPreferenceLists(assignments.get(resourceName), statePriorityMap));
 
-        // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
-        Map<String, String> swapOutToSwapInInstancePairs =
-            clusterData.getSwapOutToSwapInInstancePairs();
-        // 2. Get all enabled and live SWAP_IN instances in the cluster.
-        Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
-        // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
-        // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
-        // swap occurring.
-        if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
-          DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(
-              newIdealState.getRecord(), swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
-        }
-
         // Note the state mapping in the new assignment won't directly propagate to the map fields.
         // The rebalancer will calculate for the final state mapping considering the current states.
         finalIdealStateMap.put(resourceName, newIdealState);
@@ -419,15 +399,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       Set<String> activeNodes,
       Map<String, ResourceAssignment> currentResourceAssignment,
       RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+
     // the "real" live nodes at the time
-    // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
-    final Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
-        ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
-        clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
-    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
-    // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
-    // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
-    enabledLiveInstances.retainAll(allNodesDeduped);
+    // TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
+    final Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
+        clusterData.getAssignableInstanceConfigMap(),
+        clusterData.getAssignableEnabledLiveInstances());
 
     if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
       // no need for additional process, return the current resource assignment
@@ -622,12 +599,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       ClusterConfig clusterConfig = clusterData.getClusterConfig();
       boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
       Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
-      offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
+      offlineOrDisabledInstances.removeAll(clusterData.getAssignableEnabledLiveInstances());
       for (String resource : resourceSet) {
         DelayedRebalanceUtil
             .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
-                clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
-                clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
+                clusterData.getInstanceOfflineTimeMap(),
+                clusterData.getAssignableLiveInstances().keySet(),
+                clusterData.getAssignableInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
                 clusterConfig, _manager);
       }
     } else {
@@ -642,13 +620,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       String resourceName = resourceAssignment.getResourceName();
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
 
-      Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
-          ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
-          clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
-      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
-      // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
-      // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
-      enabledLiveInstances.retainAll(allNodesDeduped);
+      // TODO: Move evacuation into BaseControllerDataProvider assignableNode logic.
+      Set<String> enabledLiveInstances = DelayedRebalanceUtil.filterOutEvacuatingInstances(
+          clusterData.getAssignableInstanceConfigMap(),
+          clusterData.getAssignableEnabledLiveInstances());
 
       int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
index 913e04234..0fbd0ad3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
@@ -64,16 +64,16 @@ abstract class AbstractPartitionMovementConstraint extends SoftConstraint {
     return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
   }
 
-  protected double calculateAssignmentScore(String nodeName, String state,
+  protected double calculateAssignmentScore(String logicalId, String state,
       Map<String, String> instanceToStateMap) {
-    if (instanceToStateMap.containsKey(nodeName)) {
+    if (instanceToStateMap.containsKey(logicalId)) {
       // The score when the proposed allocation partially matches the assignment plan but will
       // require a state transition.
       double scoreWithStateTransitionCost =
           MIN_SCORE + (MAX_SCORE - MIN_SCORE) * STATE_TRANSITION_COST_FACTOR;
       // if state matches, no state transition required for the proposed assignment; if state does
       // not match, then the proposed assignment requires state transition.
-      return state.equals(instanceToStateMap.get(nodeName)) ? MAX_SCORE
+      return state.equals(instanceToStateMap.get(logicalId)) ? MAX_SCORE
           : scoreWithStateTransitionCost;
     }
     return MIN_SCORE;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
index 5e3fcd286..8063de34f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
@@ -44,7 +44,7 @@ public class BaselineInfluenceConstraint extends AbstractPartitionMovementConstr
 
     Map<String, String> baselineAssignment =
         getStateMap(replica, clusterContext.getBaselineAssignment());
-    return calculateAssignmentScore(node.getInstanceName(), replica.getReplicaState(),
+    return calculateAssignmentScore(node.getLogicalId(), replica.getReplicaState(),
         baselineAssignment);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 77d56302c..60d43764a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -148,10 +148,10 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
           if (scoreCompareResult == 0) {
             // If the evaluation scores of 2 nodes are the same, the algorithm assigns the replica
             // to the idle node first.
-            String instanceName1 = nodeEntry1.getKey().getInstanceName();
-            String instanceName2 = nodeEntry2.getKey().getInstanceName();
-            int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
-            int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
+            String logicalId1 = nodeEntry1.getKey().getLogicalId();
+            String logicalId2 = nodeEntry2.getKey().getLogicalId();
+            int idleScore1 = busyInstances.contains(logicalId1) ? 0 : 1;
+            int idleScore2 = busyInstances.contains(logicalId2) ? 0 : 1;
             return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
                 : -nodeEntry1.getKey().compareTo(nodeEntry2.getKey());
           } else {
@@ -271,7 +271,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
 
   /**
    * @param assignments A collection of resource replicas assignment.
-   * @return A set of instance names that have at least one replica assigned in the input assignments.
+   * @return A set of logicalIds that have at least one replica assigned in the input assignments.
    */
   private Set<String> getBusyInstances(Collection<ResourceAssignment> assignments) {
     return assignments.stream().flatMap(
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
index 08c135d66..05c7a94ed 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -40,14 +40,14 @@ public class PartitionMovementConstraint extends AbstractPartitionMovementConstr
         getStateMap(replica, clusterContext.getBestPossibleAssignment());
     Map<String, String> baselineAssignment =
         getStateMap(replica, clusterContext.getBaselineAssignment());
-    String nodeName = node.getInstanceName();
+    String logicalId = node.getLogicalId();
     String state = replica.getReplicaState();
 
     if (bestPossibleAssignment.isEmpty()) {
       // if best possible is missing, it means the replica belongs to a newly added resource, so
       // baseline assignment should be used instead.
-      return calculateAssignmentScore(nodeName, state, baselineAssignment);
+      return calculateAssignmentScore(logicalId, state, baselineAssignment);
     }
-    return calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
+    return calculateAssignmentScore(logicalId, state, bestPossibleAssignment);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 3f1673210..a869a904e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -80,7 +80,8 @@ public class ClusterModelProvider {
       Set<String> activeInstances,
       Map<String, ResourceAssignment> resourceAssignment) {
     return generateClusterModel(dataProvider, resourceMap, activeInstances, Collections.emptyMap(),
-        Collections.emptyMap(), resourceAssignment, RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
+        Collections.emptyMap(), resourceAssignment,
+        RebalanceScopeType.DELAYED_REBALANCE_OVERWRITES);
   }
 
   /**
@@ -162,8 +163,9 @@ public class ClusterModelProvider {
   public static ClusterModel generateClusterModelFromExistingAssignment(
       ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
       Map<String, ResourceAssignment> currentStateAssignment) {
-    return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
-        Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment,
+    return generateClusterModel(dataProvider, resourceMap,
+        dataProvider.getAssignableEnabledLiveInstances(), Collections.emptyMap(),
+        Collections.emptyMap(), currentStateAssignment,
         RebalanceScopeType.GLOBAL_BASELINE);
   }
 
@@ -187,11 +189,35 @@ public class ClusterModelProvider {
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
       Map<String, ResourceAssignment> idealAssignment,
       Map<String, ResourceAssignment> currentAssignment, RebalanceScopeType scopeType) {
+    Map<String, InstanceConfig> assignableInstanceConfigMap = dataProvider.getAssignableInstanceConfigMap();
     // Construct all the assignable nodes and initialize with the allocated replicas.
     Set<AssignableNode> assignableNodes =
-        getAllAssignableNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+        getAllAssignableNodes(dataProvider.getClusterConfig(), assignableInstanceConfigMap,
             activeInstances);
 
+    // Generate the logical view of the ideal assignment and the current assignment.
+    ClusterTopologyConfig clusterTopologyConfig =
+        ClusterTopologyConfig.createFromClusterConfig(dataProvider.getClusterConfig());
+    Map<String, ResourceAssignment> logicalIdIdealAssignment =
+        idealAssignment.isEmpty() ? idealAssignment
+            : generateResourceAssignmentMapLogicalIdView(idealAssignment, clusterTopologyConfig,
+                dataProvider);
+    Map<String, ResourceAssignment> logicalIdCurrentAssignment =
+        currentAssignment.isEmpty() ? currentAssignment
+            : generateResourceAssignmentMapLogicalIdView(currentAssignment, clusterTopologyConfig,
+                dataProvider);
+
+    // Get the set of active logical ids.
+    Set<String> activeLogicalIds = activeInstances.stream().map(
+        instanceName -> assignableInstanceConfigMap.get(instanceName)
+            .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
+
+    Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
+    Set<String> assignableLiveInstanceLogicalIds =
+        assignableLiveInstanceNames.stream().map(
+            instanceName -> assignableInstanceConfigMap.get(instanceName)
+                .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
+
     // Generate replica objects for all the resource partitions.
     // <resource, replica set>
     Map<String, Set<AssignableReplica>> replicaMap =
@@ -203,27 +229,28 @@ public class ClusterModelProvider {
     Set<AssignableReplica> toBeAssignedReplicas;
     switch (scopeType) {
       case GLOBAL_BASELINE:
-        toBeAssignedReplicas = findToBeAssignedReplicasByClusterChanges(replicaMap, activeInstances,
-            dataProvider.getLiveInstances().keySet(), clusterChanges, currentAssignment,
+        toBeAssignedReplicas =
+            findToBeAssignedReplicasByClusterChanges(replicaMap, activeLogicalIds,
+                assignableLiveInstanceLogicalIds, clusterChanges, logicalIdCurrentAssignment,
             allocatedReplicas);
         break;
       case PARTIAL:
         // Filter to remove the replicas that do not exist in the ideal assignment given but exist
         // in the replicaMap. This is because such replicas are new additions that do not need to be
         // rebalanced right away.
-        retainExistingReplicas(replicaMap, idealAssignment);
+        retainExistingReplicas(replicaMap, logicalIdIdealAssignment);
         toBeAssignedReplicas =
-            findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeInstances,
-                idealAssignment, currentAssignment, allocatedReplicas);
+            findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeLogicalIds,
+                logicalIdIdealAssignment, logicalIdCurrentAssignment, allocatedReplicas);
         break;
       case EMERGENCY:
-        toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
-            currentAssignment, allocatedReplicas);
+        toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeLogicalIds,
+            logicalIdCurrentAssignment, allocatedReplicas);
         break;
       case DELAYED_REBALANCE_OVERWRITES:
         toBeAssignedReplicas =
             DelayedRebalanceUtil.findToBeAssignedReplicasForMinActiveReplica(dataProvider, replicaMap.keySet(),
-                activeInstances, currentAssignment, allocatedReplicas);
+                activeLogicalIds, logicalIdCurrentAssignment, allocatedReplicas);
         break;
       default:
         throw new HelixException("Unknown rebalance scope type: " + scopeType);
@@ -231,18 +258,56 @@ public class ClusterModelProvider {
 
     // Update the allocated replicas to the assignable nodes.
     assignableNodes.parallelStream().forEach(node -> node.assignInitBatch(
-        allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
+        allocatedReplicas.getOrDefault(node.getLogicalId(), Collections.emptySet())));
 
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
         replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
-        assignableNodes, idealAssignment, currentAssignment);
+        assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment);
+
     // Initial the cluster context with the allocated assignments.
     context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
 
     return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
   }
 
+  private static Map<String, ResourceAssignment> generateResourceAssignmentMapLogicalIdView(
+      Map<String, ResourceAssignment> resourceAssignmentMap,
+      ClusterTopologyConfig clusterTopologyConfig, ResourceControllerDataProvider dataProvider) {
+
+    Map<String, InstanceConfig> allInstanceConfigMap = dataProvider.getInstanceConfigMap();
+
+    return resourceAssignmentMap.entrySet().parallelStream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          String resourceName = entry.getKey();
+          ResourceAssignment instanceNameResourceAssignment = entry.getValue();
+          ResourceAssignment logicalIdResourceAssignment = new ResourceAssignment(resourceName);
+
+          StateModelDefinition stateModelDefinition = dataProvider.getStateModelDef(
+              dataProvider.getIdealState(resourceName).getStateModelDefRef());
+
+          instanceNameResourceAssignment.getMappedPartitions().forEach(partition -> {
+            Map<String, String> logicalIdStateMap = new HashMap<>();
+
+            instanceNameResourceAssignment.getReplicaMap(partition)
+                .forEach((instanceName, state) -> {
+                  if (allInstanceConfigMap.containsKey(instanceName)) {
+                    String logicalId = allInstanceConfigMap.get(instanceName)
+                        .getLogicalId(clusterTopologyConfig.getEndNodeType());
+                    if (!logicalIdStateMap.containsKey(logicalId) || state.equals(
+                        stateModelDefinition.getTopState())) {
+                      logicalIdStateMap.put(logicalId, state);
+                    }
+                  }
+                });
+
+            logicalIdResourceAssignment.addReplicaMap(partition, logicalIdStateMap);
+          });
+
+          return logicalIdResourceAssignment;
+        }));
+  }
+
   // Filter the replicas map so only the replicas that have been allocated in the existing
   // assignmentMap remain in the map.
   private static void retainExistingReplicas(Map<String, Set<AssignableReplica>> replicaMap,
@@ -399,8 +464,11 @@ public class ClusterModelProvider {
     Set<String> newlyConnectedNodes = clusterChanges
         .getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, Collections.emptySet());
     newlyConnectedNodes.retainAll(liveInstances);
-    if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges
-        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG) || !newlyConnectedNodes.isEmpty()) {
+
+    if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG)
+        || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)
+        || !newlyConnectedNodes.isEmpty()) {
+
       // 1. If the cluster topology has been modified, need to reassign all replicas.
       // 2. If any node was newly connected, need to rebalance all replicas for the evenness of
       // distribution.
@@ -419,7 +487,7 @@ public class ClusterModelProvider {
             .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
             .contains(resourceName) || !currentAssignment.containsKey(resourceName)) {
           toBeAssignedReplicas.addAll(replicas);
-          continue; // go to check next resource
+          // go to check next resource
         } else {
           // check for every replica assignment to identify if the related replicas need to be reassigned.
           // <partition, <state, instances list>>
@@ -433,16 +501,15 @@ public class ClusterModelProvider {
             if (validInstances.isEmpty()) {
               // 3. if no such an instance in the current assignment, need to reassign the replica
               toBeAssignedReplicas.add(replica);
-              continue; // go to check the next replica
             } else {
               Iterator<String> iter = validInstances.iterator();
               // Remove the instance from the current allocation record after processing so that it
               // won't be double-processed as we loop through all replicas
-              String instanceName = iter.next();
+              String logicalId = iter.next();
               iter.remove();
               // the current assignment for this replica is valid,
               // add to the allocated replica list.
-              allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
+              allocatedReplicas.computeIfAbsent(logicalId, key -> new HashSet<>()).add(replica);
             }
           }
         }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8223d9a36..8ec4b4475 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -22,9 +22,11 @@ package org.apache.helix.controller.stages;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
@@ -35,6 +37,7 @@ import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.CustomRebalancer;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
@@ -86,9 +89,16 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
+
+    // Add swap-in instances to bestPossibleStateOutput.
+    // We do this after computing the best possible state output because rebalance algorithms should not
+    // to be aware of swap-in instances. We simply add the swap-in instances to the
+    // stateMap where the swap-out instance is and compute the correct state.
+    addSwapInInstancesToBestPossibleState(resourceMap, bestPossibleStateOutput, cache);
+
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
 
-    final Map<String, InstanceConfig> instanceConfigMap = cache.getInstanceConfigMap();
+    final Map<String, InstanceConfig> allInstanceConfigMap = cache.getInstanceConfigMap();
     final Map<String, StateModelDefinition> stateModelDefMap = cache.getStateModelDefMap();
     final Map<String, IdealState> idealStateMap = cache.getIdealStates();
     final Map<String, ExternalView> externalViewMap = cache.getExternalViews();
@@ -96,8 +106,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     asyncExecute(cache.getAsyncTasksThreadPool(), () -> {
       try {
         if (clusterStatusMonitor != null) {
-          clusterStatusMonitor
-              .setPerInstanceResourceStatus(bestPossibleStateOutput, instanceConfigMap, resourceMap,
+          clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+              allInstanceConfigMap, resourceMap,
                   stateModelDefMap);
 
           for (String resourceName : idealStateMap.keySet()) {
@@ -121,6 +131,52 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     });
   }
 
+  private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourceMap,
+      BestPossibleStateOutput bestPossibleStateOutput, ResourceControllerDataProvider cache) {
+    // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
+    Map<String, String> swapOutToSwapInInstancePairs = cache.getSwapOutToSwapInInstancePairs();
+    // 2. Get all enabled and live SWAP_IN instances in the cluster.
+    Set<String> enabledLiveSwapInInstances = cache.getEnabledLiveSwapInInstanceNames();
+    // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
+    // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
+    // swap occurring.
+    if (!enabledLiveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
+      resourceMap.forEach((resourceName, resource) -> {
+        StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+        bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
+            .forEach((partition, stateMap) -> {
+              Set<String> commonInstances = new HashSet<>(stateMap.keySet());
+              commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
+
+              commonInstances.forEach(swapOutInstance -> {
+                if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState())) {
+                  if (AbstractRebalancer.getStateCount(stateModelDef.getTopState(), stateModelDef,
+                      stateMap.size() + 1, stateMap.size() + 1) > stateMap.size()) {
+                    // If the swap-out instance's replica is a topState and the StateModel allows for
+                    // another replica with the topState to be added, set the swap-in instance's replica
+                    // to the topState.
+                    stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+                        stateModelDef.getTopState());
+                  } else {
+                    // If the swap-out instance's replica is a topState and the StateModel does not allow for
+                    // another replica with the topState to be added, set the swap-in instance's replica
+                    // to the secondTopState.
+                    stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+                        stateModelDef.getSecondTopStates().iterator().next());
+                  }
+                } else if (stateModelDef.getSecondTopStates()
+                    .contains(stateMap.get(swapOutInstance))) {
+                  // If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
+                  // to the same secondTopState.
+                  stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
+                      stateMap.get(swapOutInstance));
+                }
+              });
+            });
+      });
+    }
+  }
+
   private void reportResourceState(ClusterStatusMonitor clusterStatusMonitor,
       BestPossibleStateOutput bestPossibleStateOutput, String resourceName, IdealState is,
       ExternalView ev, StateModelDefinition stateModelDef) {
@@ -239,7 +295,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       final HelixManager manager) {
     int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
     if (maxOfflineInstancesAllowed >= 0) {
-      int offlineCount = cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
+      int offlineCount =
+          cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
       if (offlineCount > maxOfflineInstancesAllowed) {
         String errMsg = String.format(
             "Offline Instances count %d greater than allowed count %d. Put cluster %s into "
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 477e4f99b..b3990046c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -551,6 +551,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       messagesThrottled.add(messageToThrottle.getId());
       return;
     }
+    // TODO: Currently throttling is applied for messages that are targeting all instances including those not considered as
+    //  assignable. They all share the same configured limits. After discussion, there was agreement that this is the proper
+    //  behavior. In addition to this, we should consider adding priority based on whether the instance is assignable and whether
+    //  the message is bringing replica count to configured replicas or above configured replicas.
     throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition,
         messageToThrottle, messagesThrottled, RebalanceType.LOAD_BALANCE, cache,
         resourceMessageMap);
@@ -656,10 +660,12 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
     // preference list
     if (preferenceList != null) {
-      return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+      return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(
+              i -> resourceControllerDataProvider.getAssignableEnabledLiveInstances().contains(i))
           .count(), requiredNumReplica); // StateModelDefinition's counts
     }
-    return stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(),
+    return stateModelDefinition.getStateCountMap(
+        resourceControllerDataProvider.getAssignableEnabledLiveInstances().size(),
         requiredNumReplica); // StateModelDefinition's counts
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
index e4a7e12aa..d262d1402 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
@@ -90,7 +90,7 @@ public class MaintenanceRecoveryStage extends AbstractAsyncBaseStage {
       }
       // Get the count of all instances that are either offline or disabled
       int offlineDisabledCount =
-          cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
+          cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
       shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit;
       reason = String.format(
           "Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 8a7ae52b5..5c22c11db 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -266,7 +266,8 @@ public class MessageGenerationPhase extends AbstractBaseStage {
               LogUtil.logError(logger, _eventId, String.format(
                   "An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s",
                   sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition),
-                  instanceStateMap, cache.getAllInstances(), cache.getLiveInstances().keySet(),
+                  instanceStateMap, cache.getAllInstances(),
+                  cache.getLiveInstances().keySet(),
                   message));
               continue; // Do not add this message
             }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index a7e9742e0..7e8bde9d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1f77fa66a..00b2fd71b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -19,12 +19,10 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.HelixProperty;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index dbedf7bcc..f9662101c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -40,7 +40,6 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.TaskConstants;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 34bd56487..7a0fe6377 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -643,16 +643,22 @@ public class ZKHelixAdmin implements HelixAdmin {
     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
-    // 1. Check that both instances are alive.
+    // 1. Check that both instances are alive and enabled.
     LiveInstance swapOutLiveInstance =
         accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
     LiveInstance swapInLiveInstance =
         accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
-    if (swapOutLiveInstance == null || swapInLiveInstance == null) {
+    InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
+    InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
+    if (swapOutLiveInstance == null || swapInLiveInstance == null
+        || !swapOutInstanceConfig.getInstanceEnabled()
+        || !swapInInstanceConfig.getInstanceEnabled()) {
       logger.warn(
-          "SwapOutInstance {} is {} and SwapInInstance {} is {} for cluster {}. Swap will not complete unless both instances are ONLINE.",
+          "SwapOutInstance {} is {} + {} and SwapInInstance {} is {} + {} for cluster {}. Swap will not complete unless both instances are ONLINE.",
           swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
-          swapInInstanceName, swapInLiveInstance != null ? "ONLINE" : "OFFLINE", clusterName);
+          swapOutInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", swapInInstanceName,
+          swapInLiveInstance != null ? "ONLINE" : "OFFLINE",
+          swapInInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", clusterName);
       return false;
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
index 4b56057cd..b347cea03 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceAssignment.java
@@ -39,7 +39,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
  * can be in s1.
  */
 public class ResourceAssignment extends HelixProperty {
-
   /**
    * Initialize an empty mapping
    * @param resourceName the resource being mapped
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 8da61958c..819ee5d72 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -647,7 +647,8 @@ public abstract class AbstractTaskDispatcher {
       int jobCfgLimitation =
           jobCfg.getNumConcurrentTasksPerInstance() - assignedPartitions.get(instance).size();
       // 2. throttled by participant capacity
-      int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask();
+      int participantCapacity =
+          cache.getAssignableInstanceConfigMap().get(instance).getMaxConcurrentTask();
       if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
         participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
       }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 4b0e00f81..fd22a8e1f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -141,8 +141,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Will contain the list of partitions that must be explicitly dropped from the ideal state that
     // is stored in zk.
     Set<String> liveInstances =
-        jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
-            : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+        jobCfg.getInstanceGroupTag() == null ? _dataProvider.getAssignableEnabledLiveInstances()
+            : _dataProvider.getAssignableEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job: {}", jobName);
@@ -163,7 +163,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       if (jobTgtState == TargetState.STOP) {
         // If the assigned instance is no longer live, so mark it as DROPPED in the context
         markPartitionsWithoutLiveInstance(jobCtx, liveInstances);
-        
+
         if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
           workflowCtx.setJobState(jobName, TaskState.STOPPED);
         } else {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index da9324acf..c7d46b8e4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -146,7 +146,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
     cache.refresh(_accessor);
 
     List<String> liveInstances = new ArrayList<String>();
-    liveInstances.addAll(cache.getLiveInstances().keySet());
+    liveInstances.addAll(cache.getAssignableLiveInstances().keySet());
     boolean success = verifyLiveNodes(liveInstances);
     if (!success) {
       LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index ca47c16b9..d0da9ba8e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -330,16 +330,16 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
     Map<String, Map<String, String>> idealPartitionState = new HashMap<>();
 
     for (String partition : idealState.getPartitionSet()) {
-      List<String> preferenceList = AbstractRebalancer
-          .getPreferenceList(new Partition(partition), idealState, cache.getEnabledLiveInstances());
+      List<String> preferenceList = AbstractRebalancer.getPreferenceList(new Partition(partition),
+          idealState, cache.getAssignableEnabledLiveInstances());
       Map<String, String> idealMapping;
       if (_isDeactivatedNodeAware) {
-        idealMapping = HelixUtil
-            .computeIdealMapping(preferenceList, stateModelDef, cache.getLiveInstances().keySet(),
+        idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
+            cache.getAssignableLiveInstances().keySet(),
                 cache.getDisabledInstancesForPartition(idealState.getResourceName(), partition));
       } else {
-        idealMapping = HelixUtil
-            .computeIdealMapping(preferenceList, stateModelDef, cache.getEnabledLiveInstances(),
+        idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
+            cache.getAssignableEnabledLiveInstances(),
                 Collections.emptySet());
       }
       idealPartitionState.put(partition, idealMapping);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 88c33f608..4a3d49b73 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -294,9 +294,9 @@ public final class HelixUtil {
           .collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
       // For LiveInstances, we must preserve the existing session IDs
       // So read LiveInstance objects from the cluster and do a "retainAll" on them
-      // liveInstanceMap is an unmodifiableMap instances, so we filter using a stream
-      Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
-      List<LiveInstance> filteredLiveInstances = liveInstanceMap.entrySet().stream()
+      // assignableLiveInstanceMap is an unmodifiableMap instances, so we filter using a stream
+      Map<String, LiveInstance> assignableLiveInstanceMap = dataProvider.getAssignableLiveInstances();
+      List<LiveInstance> filteredLiveInstances = assignableLiveInstanceMap.entrySet().stream()
           .filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
           .collect(Collectors.toList());
       // Synthetically create LiveInstance objects that are passed in as the parameter
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
index 80679310a..6f6cdc0a7 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
@@ -111,11 +111,11 @@ public class TestHelixPropoertyTimmer {
     ResourceControllerDataProvider dataProvider =
         Mockito.mock(ResourceControllerDataProvider.class);
     when(dataProvider.getRefreshedChangeTypes()).thenReturn(changeTypes);
-    when(dataProvider.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(dataProvider.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(dataProvider.getIdealStates()).thenReturn(idealStateMap);
     when(dataProvider.getResourceConfigMap()).thenReturn(resourceConfigMap);
     when(dataProvider.getClusterConfig()).thenReturn(clusterConfig);
-    when(dataProvider.getLiveInstances()).thenReturn(Collections.emptyMap());
+    when(dataProvider.getAssignableLiveInstances()).thenReturn(Collections.emptyMap());
     return dataProvider;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 32a131d48..905c5552b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -243,7 +243,7 @@ public class TestAutoRebalanceStrategy {
           }
         }
         Map<String, String> assignment = new AutoRebalancer()
-            .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
+            .computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(), _stateModelDef,
                 preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
                 MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 000978ef1..951e0e3c5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,7 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -108,16 +108,21 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       _instances.add(instanceName);
       // 1. Set up the default instance information with capacity configuration.
       InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
-      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
       instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
       when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
       // 2. Mock the live instance node for the default instance.
       LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
-      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
       liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+      when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAssignableInstances()).thenReturn(_instances);
       when(testCache.getAllInstances()).thenReturn(_instances);
     }
 
@@ -370,7 +375,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         Collectors.toMap(resourceName -> resourceName, Resource::new));
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
+          clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -434,7 +439,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Calculation will fail
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
+          clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -601,6 +606,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     String offlinePartition = _partitionNames.get(0);
     String offlineState = "MASTER";
     String offlineInstance = "offlineInstance";
+    InstanceConfig offlineInstanceConfig = createMockInstanceConfig(offlineInstance);
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
+    instanceConfigMap.put(offlineInstance, offlineInstanceConfig);
+    when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
     for (Partition partition : bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
       if (partition.getPartitionName().equals(offlinePartition)) {
         bestPossibleAssignment.get(offlineResource)
@@ -649,12 +659,13 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Set<String> instances = new HashSet<>(_instances);
     String offlineInstance = "offlineInstance";
     instances.add(offlineInstance);
-    when(clusterData.getAllInstances()).thenReturn(instances);
+    when(clusterData.getAssignableInstances()).thenReturn(instances);
     Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
     instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
     when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+    when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns false
@@ -737,15 +748,16 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // force create a fake offlineInstance that's in delay window
     Set<String> instances = new HashSet<>(_instances);
     instances.add(offlineInstance);
-    when(clusterData.getAllInstances()).thenReturn(instances);
-    when(clusterData.getEnabledInstances()).thenReturn(instances);
-    when(clusterData.getEnabledLiveInstances()).thenReturn(
+    when(clusterData.getAssignableInstances()).thenReturn(instances);
+    when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
+    when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(
         new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
     Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
     instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
     when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+    when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     Map<String, IdealState> isMap = new HashMap<>();
@@ -881,10 +893,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     // force create a fake offlineInstance that's in delay window
     Set<String> instances = new HashSet<>(_instances);
-    when(clusterData.getAllInstances()).thenReturn(instances);
-    when(clusterData.getEnabledInstances()).thenReturn(instances);
-    when(clusterData.getEnabledLiveInstances()).thenReturn(instances);
-    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    when(clusterData.getAssignableInstances()).thenReturn(instances);
+    when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
+    when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(instances);
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
+    when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     Map<String, IdealState> isMap = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index bdc677bde..c5c7b560c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -256,16 +256,21 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
       _instances.add(instanceName);
       // 1. Set up the default instance information with capacity configuration.
       InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
-      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
       instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
       when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
       // 2. Mock the live instance node for the default instance.
       LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
-      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
       liveInstanceMap.put(instanceName, testLiveInstance);
+      when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+      when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getAssignableInstances()).thenReturn(_instances);
       when(testCache.getAllInstances()).thenReturn(_instances);
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
index 16c199470..9b4d1de15 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -53,6 +53,7 @@ public class TestPartitionMovementConstraint {
     when(_testReplica.getResourceName()).thenReturn(RESOURCE);
     when(_testReplica.getPartitionName()).thenReturn(PARTITION);
     when(_testNode.getInstanceName()).thenReturn(INSTANCE);
+    when(_testNode.getLogicalId()).thenReturn(INSTANCE);
   }
 
   @Test
@@ -104,6 +105,7 @@ public class TestPartitionMovementConstraint {
 
     // when the replica's state matches with best possible, allocation matches with baseline
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameA);
+    when(testAssignableNode.getLogicalId()).thenReturn(instanceNameA);
     when(_testReplica.getReplicaState()).thenReturn("Master");
     verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
         0.5, 0.5);
@@ -112,6 +114,7 @@ public class TestPartitionMovementConstraint {
 
     // when the replica's allocation matches with best possible only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameB);
+    when(testAssignableNode.getLogicalId()).thenReturn(instanceNameB);
     when(_testReplica.getReplicaState()).thenReturn("Master");
     verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
         0.0, 0.0);
@@ -120,6 +123,7 @@ public class TestPartitionMovementConstraint {
 
     // when the replica's state matches with baseline only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
+    when(testAssignableNode.getLogicalId()).thenReturn(instanceNameC);
     when(_testReplica.getReplicaState()).thenReturn("Master");
     verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
         1.0, 1.0);
@@ -128,6 +132,7 @@ public class TestPartitionMovementConstraint {
 
     // when the replica's allocation matches with baseline only
     when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
+    when(testAssignableNode.getLogicalId()).thenReturn(instanceNameC);
     when(_testReplica.getReplicaState()).thenReturn("Slave");
     verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
         0.5, 0.5);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index b9c4ce39d..c9deb792d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -103,6 +103,7 @@ public abstract class AbstractTestClusterModel {
     testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     // 2. Set up the basic cluster configuration.
@@ -121,7 +122,7 @@ public abstract class AbstractTestClusterModel {
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
     Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
     liveInstanceMap.put(_testInstanceId, testLiveInstance);
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
 
     // 4. Mock two resources, each with 2 partitions on the default instance.
     // The instance will have the following partitions assigned
@@ -288,7 +289,7 @@ public abstract class AbstractTestClusterModel {
 
   protected Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
     Set<AssignableNode> nodeSet = new HashSet<>();
-    testCache.getInstanceConfigMap().values().forEach(config -> nodeSet
+    testCache.getAssignableInstanceConfigMap().values().forEach(config -> nodeSet
         .add(new AssignableNode(testCache.getClusterConfig(), config, config.getInstanceName())));
     return nodeSet;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index e13fb3979..d63f05c81 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -63,7 +63,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(TEST_INSTANCE_ID_1, testInstanceConfig1);
     instanceConfigMap.put(TEST_INSTANCE_ID_2, testInstanceConfig2);
-    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
     Set<AssignableNode> assignableNodes = generateNodes(testCache);
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 9cbfc2560..0f751b186 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -64,7 +64,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     expectedCapacityMap.put("item3", 30);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     assignableNode.assignInitBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
@@ -177,7 +177,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
         "MASTER", 1);
@@ -192,7 +192,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     assignableNode.assignInitBatch(assignmentSet);
     AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
@@ -213,10 +213,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     testInstanceConfig.setDomain("instance=testInstance");
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
-    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     AssignableNode node = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     Assert.assertEquals(node.getFaultZone(), "Helix_default_zone");
   }
 
@@ -234,10 +234,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     testInstanceConfig.setDomain("zone=2, instance=testInstance");
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
-    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2");
 
@@ -251,10 +251,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     testInstanceConfig.setDomain("zone=2, instance=testInstance");
     instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
-    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
 
@@ -268,10 +268,10 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     testInstanceConfig.setDomain("rack=3, zone=2, instance=testInstanceConfigId");
     instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
-    when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+    when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
     assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+        testCache.getAssignableInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     Assert.assertEquals(assignableNode.getFaultZone(), "3/2");
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 500cfb0b6..34582d600 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -79,14 +79,15 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
       _instances.add(instanceName);
       // 1. Set up the default instance information with capacity configuration.
       InstanceConfig testInstanceConfig = createMockInstanceConfig(instanceName);
-      Map<String, InstanceConfig> instanceConfigMap = testCache.getInstanceConfigMap();
+      Map<String, InstanceConfig> instanceConfigMap = testCache.getAssignableInstanceConfigMap();
       instanceConfigMap.put(instanceName, testInstanceConfig);
+      when(testCache.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
       when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
       // 2. Mock the live instance node for the default instance.
       LiveInstance testLiveInstance = createMockLiveInstance(instanceName);
-      Map<String, LiveInstance> liveInstanceMap = testCache.getLiveInstances();
+      Map<String, LiveInstance> liveInstanceMap = testCache.getAssignableLiveInstances();
       liveInstanceMap.put(instanceName, testLiveInstance);
-      when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+      when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
     }
 
     return testCache;
@@ -104,8 +105,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Set<String> activeInstances = new HashSet<>();
     activeInstances.add(instance1);
     activeInstances.add(instance2);
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
 
     // test 0, empty input
     Assert.assertEquals(
@@ -142,8 +143,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
 
     // test 2, no additional replica to be assigned
     testCache = setupClusterDataCache();
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
@@ -167,8 +168,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
 
     // test 3, minActiveReplica==2, two partitions falling short
     testCache = setupClusterDataCache();
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
@@ -205,8 +206,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Set<String> activeInstances = new HashSet<>();
     activeInstances.add(instance1);
     activeInstances.add(instance2);
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
 
     // test 1, one partition under minActiveReplica
     Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
@@ -245,8 +246,8 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
 
     // test 2, minActiveReplica==2, three partitions falling short
     testCache = setupClusterDataCache();
-    when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
+    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
@@ -413,7 +414,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         .allMatch(replicaSet -> replicaSet.size() == 4));
 
     // Adjust instance fault zone, so they have different fault zones.
-    testCache.getInstanceConfigMap().values().stream()
+    testCache.getAssignableInstanceConfigMap().values().stream()
         .forEach(config -> config.setZoneId(config.getInstanceName()));
     clusterModel = ClusterModelProvider.generateClusterModelForBaseline(testCache,
         _resourceNames.stream()
@@ -576,7 +577,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 0);
 
     // Adjust instance fault zone, so they have different fault zones.
-    testCache.getInstanceConfigMap().values().stream()
+    testCache.getAssignableInstanceConfigMap().values().stream()
         .forEach(config -> config.setZoneId(config.getInstanceName()));
 
     // 2. test with a pair of identical best possible assignment and baseline assignment
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 4d8fb8669..0027f8e4e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -49,8 +49,9 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
       "testResourceName"
     };
     setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
-    setupLiveInstances(5);
     setupStateModel();
+    setupInstances(5);
+    setupLiveInstances(5);
 
     Map<String, Resource> resourceMap = getResourceMap();
     CurrentStateOutput currentStateOutput = new CurrentStateOutput();
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 7289dc3f8..e33dc9f5d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -50,6 +50,7 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
         BuiltInStateModelDefinitions.MasterSlave.name());
     setupLiveInstances(5);
     setupStateModel();
+    setupInstances(5);
 
     Map<String, Resource> resourceMap =
         getResourceMap(resources, numPartition, BuiltInStateModelDefinitions.MasterSlave.name());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index 2fcfc3e59..7b891522c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.LiveInstance;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 60281e65b..7da3d64c2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -572,6 +572,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
         IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
     setupStateModel();
+    setupInstances(numOfLiveInstances);
     setupLiveInstances(numOfLiveInstances);
 
     // Set up cluster configs
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 40fa82030..d3c41018c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -57,8 +57,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   @Test
   public void testDuplicateMsg() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_dup";
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    admin.addCluster(clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
     refreshClusterConfig(clusterName, accessor);
@@ -82,10 +84,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0
     }, resourceGroups, 1, 1);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0});
     List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0
     });
-    setupStateModel(clusterName);
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
@@ -321,10 +324,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0
     }, resourceGroups, 1, 1);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0});
     List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0
     });
-    setupStateModel(clusterName);
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
@@ -395,8 +399,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   @Test
   public void testMasterXfer() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_xfer";
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    admin.addCluster(clusterName);
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
@@ -417,10 +423,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0, 1});
     List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         1
     });
-    setupStateModel(clusterName);
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
@@ -474,8 +481,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   @Test
   public void testNoDuplicatedMaster() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_no_duplicated_master";
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    admin.addCluster(clusterName);
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
@@ -496,10 +505,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0, 1});
     List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0, 1
     });
-    setupStateModel(clusterName);
 
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
@@ -553,6 +563,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   public void testNoMessageSentOnControllerLeadershipLoss() throws Exception {
     String methodName = TestHelper.getTestMethodName();
     String clusterName = _className + "_" + methodName;
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+
+    admin.addCluster(clusterName);
 
     final String resourceName = "testResource_" + methodName;
     final String partitionName = resourceName + "_0";
@@ -565,10 +578,11 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0
     }, resourceGroups, 1, 1);
+    setupStateModel(clusterName);
+    setupInstances(clusterName, new int[]{0});
     List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
         0
     });
-    setupStateModel(clusterName);
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index af47542ee..e4aeed04f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -71,7 +71,7 @@ public class TestReplicaLevelThrottling extends BaseStageTest {
     when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
     when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
         (StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
-    when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
+    when(mock.cache.getAssignableEnabledLiveInstances()).thenReturn(new HashSet<>(
         ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
     when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
         ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
index 742020b20..d39dd1fd2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestOfflineNodeTimeoutDuringMaintenanceMode.java
@@ -208,15 +208,15 @@ public class TestOfflineNodeTimeoutDuringMaintenanceMode extends ZkTestBase {
         new ResourceControllerDataProvider(CLUSTER_NAME);
     resourceControllerDataProvider.refresh(_helixDataAccessor);
     Assert
-        .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance3));
+        .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance3));
     Assert
-        .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance4));
-    Assert.assertTrue(resourceControllerDataProvider.getLiveInstances().containsKey(instance5));
+        .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance4));
+    Assert.assertTrue(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance5));
     Assert
-        .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance6));
+        .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance6));
     Assert
-        .assertFalse(resourceControllerDataProvider.getLiveInstances().containsKey(instance7));
-    Assert.assertTrue(resourceControllerDataProvider.getLiveInstances().containsKey(instance8));
+        .assertFalse(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance7));
+    Assert.assertTrue(resourceControllerDataProvider.getAssignableLiveInstances().containsKey(instance8));
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index a324a3300..f01af8a7f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -208,7 +208,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase {
     ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
     dataCache.refresh(_accessor);
 
-    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
     LiveInstance liveInstance = liveInstanceMap.get(instance);
 
     Map<String, CurrentState> currentStateMap =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index 67d02a421..33cdbd378 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -192,7 +192,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   private void verifyP2PDisabled() {
     ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
     dataCache.refresh(_accessor);
-    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
 
     for (LiveInstance instance : liveInstanceMap.values()) {
       Map<String, CurrentState> currentStateMap =
@@ -218,7 +218,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
   private void verifyP2PEnabled(long startTime) {
     ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(CLUSTER_NAME);
     dataCache.refresh(_accessor);
-    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getAssignableLiveInstances();
 
     for (LiveInstance instance : liveInstanceMap.values()) {
       Map<String, CurrentState> currentStateMap =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
index f668d1603..3c389ac3a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java
@@ -293,13 +293,13 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBase {
       int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
       String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
       int instances = 0;
-      for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-        if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+      for (String liveInstanceName : cache.getAssignableLiveInstances().keySet()) {
+        if (cache.getAssignableInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
           instances++;
         }
       }
       if (instances == 0) {
-        instances = cache.getLiveInstances().size();
+        instances = cache.getAssignableLiveInstances().size();
       }
       ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
       if (ev == null) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
index 0fb203e31..49800328d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
@@ -222,7 +222,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
       try {
         return verifyBalanceExternalView(
             accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
-            numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(),
+            numberOfPartitions, masterValue, replicas, cache.getAssignableLiveInstances().size(),
             cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
       } catch (Exception e) {
         LOG.debug("Verify failed", e);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
index 1bf67ccae..07a0a5de5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomRebalancer.java
@@ -68,7 +68,7 @@ public class TestCustomRebalancer {
     when(cache.getStateModelDef(stateModelName)).thenReturn(stateModelDef);
     when(cache.getDisabledInstancesForPartition(resource.getResourceName(), partitionName))
         .thenReturn(ImmutableSet.of(instanceName));
-    when(cache.getLiveInstances())
+    when(cache.getAssignableLiveInstances())
         .thenReturn(ImmutableMap.of(instanceName, new LiveInstance(instanceName)));
 
     CurrentStateOutput currOutput = new CurrentStateOutput();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
index f7fa7ddd1..4f52d2ecc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
@@ -62,7 +62,7 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
     public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
         CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) {
       testRebalancerInvoked = true;
-      List<String> liveNodes = Lists.newArrayList(clusterData.getLiveInstances().keySet());
+      List<String> liveNodes = Lists.newArrayList(clusterData.getAssignableLiveInstances().keySet());
       int i = 0;
       for (String partition : currentIdealState.getPartitionSet()) {
         int index = i++ % liveNodes.size();
@@ -139,13 +139,13 @@ public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase {
         int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
         String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
         int instances = 0;
-        for (String liveInstanceName : cache.getLiveInstances().keySet()) {
-          if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
+        for (String liveInstanceName : cache.getAssignableLiveInstances().keySet()) {
+          if (cache.getAssignableInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) {
             instances++;
           }
         }
         if (instances == 0) {
-          instances = cache.getLiveInstances().size();
+          instances = cache.getAssignableLiveInstances().size();
         }
         return verifyBalanceExternalView(
             accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 9ccc14fdf..b7c90d841 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -41,6 +41,7 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -61,7 +62,8 @@ public class TestInstanceOperation extends ZkTestBase {
   protected static final String HOST = "host";
   protected static final String LOGICAL_ID = "logicalId";
   protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, HOST, LOGICAL_ID);
-
+  protected static final ImmutableSet<String> TOP_STATE_SET =
+      ImmutableSet.of("MASTER");
   protected static final ImmutableSet<String> SECONDARY_STATE_SET =
       ImmutableSet.of("SLAVE", "STANDBY");
   protected static final ImmutableSet<String> ACCEPTABLE_STATE_SET =
@@ -73,6 +75,7 @@ public class TestInstanceOperation extends ZkTestBase {
   List<String> _participantNames = new ArrayList<>();
   private Set<String> _allDBs = new HashSet<>();
   private ZkHelixClusterVerifier _clusterVerifier;
+  private ZkHelixClusterVerifier _bestPossibleClusterVerifier;
   private ConfigAccessor _configAccessor;
   private long _stateModelDelay = 3L;
 
@@ -102,6 +105,10 @@ public class TestInstanceOperation extends ZkTestBase {
         .setResources(_allDBs)
         .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
         .build();
+    _bestPossibleClusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs)
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
     _configAccessor = new ConfigAccessor(_gZkClient);
     _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
@@ -122,6 +129,7 @@ public class TestInstanceOperation extends ZkTestBase {
     clusterConfig.setDelayRebalaceEnabled(true);
     clusterConfig.setRebalanceDelayTime(1800000L);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    enabledTopologyAwareRebalance();
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
@@ -152,8 +160,8 @@ public class TestInstanceOperation extends ZkTestBase {
     for (int i = 0; i < _participants.size(); i++) {
       String participantName = _participantNames.get(i);
       if (!_originalParticipantNames.contains(participantName)) {
-        _participants.get(i).syncStop();
         _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantName, false);
+        _participants.get(i).syncStop();
         _gSetupTool.getClusterManagementTool()
             .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName));
         droppedParticipants.add(participantName);
@@ -390,6 +398,7 @@ public class TestInstanceOperation extends ZkTestBase {
         null);
     addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
 
+
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     Map<String, ExternalView> assignment = getEVs();
     for (String resource : _allDBs) {
@@ -680,7 +689,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // and adding the SWAP_IN instance to the cluster.
     // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
     // but none of them are in a top state.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
@@ -747,7 +756,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // and adding the SWAP_IN instance to the cluster.
     // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
     // but none of them are in a top state.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
@@ -793,7 +802,7 @@ public class TestInstanceOperation extends ZkTestBase {
         InstanceConstants.InstanceOperation.SWAP_OUT);
 
     // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
@@ -808,7 +817,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // and adding the SWAP_IN instance to the cluster.
     // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
     // but none of them are in a top state.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
@@ -880,7 +889,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
     // Validate that the assignment has not changed since adding the SWAP_IN node.
     // During MM, the cluster should not compute new assignment.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
@@ -892,7 +901,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // Validate that partitions on SWAP_OUT instance does not change after exiting MM
     // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
     // but none of them are in a top state.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
@@ -956,16 +965,12 @@ public class TestInstanceOperation extends ZkTestBase {
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
         InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
     // Validate that the SWAP_IN instance has the same partitions as the SWAP_OUT instance in second top state.
     Map<String, String> swapInInstancePartitionsAndStates =
         getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
-    Assert.assertTrue(
-        swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
-    Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
-    swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
-    Assert.assertEquals(swapInInstanceStates.size(), 0);
+    Assert.assertEquals(swapInInstancePartitionsAndStates.keySet().size(), 0);
 
     // Assert canSwapBeCompleted is false because SWAP_OUT instance is disabled.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
@@ -975,23 +980,22 @@ public class TestInstanceOperation extends ZkTestBase {
     _gSetupTool.getClusterManagementTool()
         .enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
-    // Assert completeSwapIfPossible is true
-    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
-        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance originally
-    // had. Validate they are in second top state because initially disabling SWAP_OUT instance
-    // caused all topStates to be handed off to next replica in the preference list.
+    // had. Validate they are in second top state.
     swapInInstancePartitionsAndStates =
         getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
     Assert.assertTrue(
         swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
-    swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
+    Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
     swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
     Assert.assertEquals(swapInInstanceStates.size(), 0);
 
+    // Assert completeSwapIfPossible is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName));
+
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
@@ -1036,7 +1040,7 @@ public class TestInstanceOperation extends ZkTestBase {
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
     // Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
@@ -1059,7 +1063,7 @@ public class TestInstanceOperation extends ZkTestBase {
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
     // Try to remove the InstanceOperation from the SWAP_IN instance before the SWAP_OUT instance is set.
     // This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation
@@ -1108,7 +1112,7 @@ public class TestInstanceOperation extends ZkTestBase {
     // and adding the SWAP_IN instance to the cluster.
     // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
     // but none of them are in a top state.
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Set.of(instanceToSwapInName), Collections.emptySet());
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index 588de917a..f6ef8279d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -174,7 +174,7 @@ public class TestWagedNodeSwap extends ZkTestBase {
     _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
     InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
     String zone = instanceConfig.getDomainAsMap().get("zone");
-    String domain = String.format("zone=%s,instance=%s", zone, newParticipantName);
+    String domain = String.format("zone=%s,instance=%s", zone, oldParticipantName);
     newConfig.setDomain(domain);
     _gSetupTool.getClusterManagementTool()
         .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 649ff7ec1..1002ee7dd 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -112,8 +112,8 @@ public class TestP2PMessages extends BaseStageTest {
       e.printStackTrace();
     }
 
-    _instances = _dataCache.getAllInstances();
-    _liveInstanceMap = _dataCache.getLiveInstances();
+    _instances = _dataCache.getAssignableInstances();
+    _liveInstanceMap = _dataCache.getAssignableLiveInstances();
 
     _initialStateMap = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     _initialMaster = getTopStateInstance(_initialStateMap.getInstanceStateMap(_db, _partition),
@@ -218,7 +218,7 @@ public class TestP2PMessages extends BaseStageTest {
     // Old master (initialMaster) failed the M->S transition,
     // but has not forward p2p message to new master (secondMaster) yet.
     // Validate: Controller should ignore the ERROR partition and send S->M message to new master.
-    String session = _dataCache.getLiveInstances().get(_initialMaster).getEphemeralOwner();
+    String session = _dataCache.getAssignableLiveInstances().get(_initialMaster).getEphemeralOwner();
     PropertyKey currentStateKey =
         new PropertyKey.Builder(_clusterName).currentState(_initialMaster, session, _db);
     CurrentState currentState = accessor.getProperty(currentStateKey);
@@ -308,7 +308,7 @@ public class TestP2PMessages extends BaseStageTest {
   private void handleMessage(String instance, String resource) {
     PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance);
     List<Message> messages = accessor.getChildValues(propertyKey, true);
-    String session = _dataCache.getLiveInstances().get(instance).getEphemeralOwner();
+    String session = _dataCache.getAssignableLiveInstances().get(instance).getEphemeralOwner();
 
     for (Message m : messages) {
       if (m.getResourceName().equals(resource)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index e849be28c..d8810b153 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -86,8 +86,8 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
     when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
-    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
-    when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
@@ -123,8 +123,8 @@ public class TestTargetedTaskStateChange {
     when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
     when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
-    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
-    when(mock._cache.getInstanceConfigMap()).thenReturn(_instanceConfigs);
+    when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 6b357a384..d0f0c5715 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -322,7 +322,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   }
 
   protected void setupHelixResources() throws Exception {
-    _clusters = createClusters(4);
+    _clusters = createClusters(5);
     _gSetupTool.addCluster(_superCluster, true);
     _gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
     _clusters.add(_superCluster);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 92dfff002..5cfab76a0 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -45,7 +45,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestInstancesAccessor extends AbstractTestClass {
-  private final static String CLUSTER_NAME = "TestCluster_0";
+  private final static String CLUSTER_NAME = "TestCluster_4";
 
   @DataProvider
   public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() {