You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/11/07 19:29:38 UTC

[helix] branch wagedRebalancer updated: Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new df661d3  Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)
df661d3 is described below

commit df661d34614cd7e03cd375426fa1ae2a9cccf0db
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:29:30 2019 -0800

    Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)
    
    Use a dry-run rebalancer to avoid updating the persisted rebalancer status in the verifiers or tests.
    Also, refine several rebalancer related interfaces so as to simplify the dry-run rebalancer implementation.
    Convert the test cases back to use the BestPossibleExternalViewVerifier.
    
    Additional fixing:
    - Updating the rebalancer preference for every rebalancer.compute calls. Since the preference might be updated at runtime.
    - Fix one minor metric domain name bug in the WagedRebalancerMetricCollector.
    - Minor test case fix to make them more stable after the change.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  |  16 +--
 .../rebalancer/waged/WagedRebalancer.java          | 157 +++++++++++++--------
 .../stages/BestPossibleStateCalcStage.java         |  71 +++++-----
 .../helix/monitoring/metrics/MetricCollector.java  |   3 +-
 .../metrics/WagedRebalancerMetricCollector.java    |  15 +-
 .../BestPossibleExternalViewVerifier.java          |  82 +++++++++--
 .../rebalancer/waged/TestWagedRebalancer.java      |  42 +++---
 .../waged/TestWagedRebalancerMetrics.java          |   2 +-
 .../TestDelayedAutoRebalance.java                  |   8 +-
 .../TestDelayedAutoRebalanceWithRackaware.java     |   4 +-
 .../PartitionMigration/TestExpandCluster.java      |   4 +-
 .../TestPartitionMigrationBase.java                |  12 +-
 .../PartitionMigration/TestWagedExpandCluster.java |  16 +--
 .../TestWagedRebalancerMigration.java              |  18 +--
 .../rebalancer/TestMixedModeAutoRebalance.java     |  12 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       |  18 ++-
 .../WagedRebalancer/TestDelayedWagedRebalance.java |  14 --
 ...tDelayedWagedRebalanceWithDisabledInstance.java |  46 +++---
 .../TestDelayedWagedRebalanceWithRackaware.java    |  43 +++---
 .../TestMixedModeWagedRebalance.java               |   9 --
 .../{TestNodeSwap.java => TestWagedNodeSwap.java}  |  12 +-
 .../WagedRebalancer/TestWagedRebalance.java        |  62 ++++++++
 .../TestWagedRebalanceFaultZone.java               |   8 +-
 23 files changed, 383 insertions(+), 291 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 234c88c..843d1b6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.BucketDataAccessor;
@@ -35,6 +34,7 @@ import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
 
+
 /**
  * A placeholder before we have the real assignment metadata store.
  */
@@ -49,14 +49,14 @@ public class AssignmentMetadataStore {
   private BucketDataAccessor _dataAccessor;
   private String _baselinePath;
   private String _bestPossiblePath;
-  private Map<String, ResourceAssignment> _globalBaseline;
-  private Map<String, ResourceAssignment> _bestPossibleAssignment;
+  protected Map<String, ResourceAssignment> _globalBaseline;
+  protected Map<String, ResourceAssignment> _bestPossibleAssignment;
 
   AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
     this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
   }
 
-  AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+  protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
     _dataAccessor = bucketDataAccessor;
     _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
     _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
@@ -153,8 +153,8 @@ public class AssignmentMetadataStore {
     HelixProperty property = new HelixProperty(name);
     // Add each resource's assignment as a simple field in one ZNRecord
     // Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
-    assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
-        new String(SERIALIZER.serialize(assignment.getRecord()))));
+    assignmentMap.forEach((resource, assignment) -> property.getRecord()
+        .setSimpleField(resource, new String(SERIALIZER.serialize(assignment.getRecord()))));
     return property;
   }
 
@@ -167,8 +167,8 @@ public class AssignmentMetadataStore {
     Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
     // Convert each resource's assignment String into a ResourceAssignment object and put it in a
     // map
-    property.getRecord().getSimpleFields()
-        .forEach((resource, assignmentStr) -> assignmentMap.put(resource,
+    property.getRecord().getSimpleFields().forEach((resource, assignmentStr) -> assignmentMap
+        .put(resource,
             new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
     return assignmentMap;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 605dcd1..c472e77 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
@@ -70,58 +71,70 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
   private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
-          HelixConstants.ChangeType.IDEAL_STATE,
-          HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+      ImmutableSet
+          .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
+              HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+  // To identify if the preference has been configured or not.
+  private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
+      NOT_CONFIGURED_PREFERENCE = ImmutableMap
+      .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
+          ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
+
   private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
   private final AssignmentMetadataStore _assignmentMetadataStore;
-  private final RebalanceAlgorithm _rebalanceAlgorithm;
-  private MetricCollector _metricCollector;
-
-  private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
-    AssignmentMetadataStore assignmentMetadataStore = null;
-    if (helixManager != null) {
-      String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString();
-      String clusterName = helixManager.getClusterName();
-      if (metadataStoreAddrs != null && clusterName != null) {
-        assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
-      }
+  private final MetricCollector _metricCollector;
+  private RebalanceAlgorithm _rebalanceAlgorithm;
+  private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
+      NOT_CONFIGURED_PREFERENCE;
+
+  private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
+      String clusterName) {
+    if (metadataStoreAddrs != null && clusterName != null) {
+      return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
     }
-    return assignmentMetadataStore;
+    return null;
   }
 
   public WagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
-      MetricCollector metricCollector) {
-    this(constructAssignmentStore(helixManager),
-        ConstraintBasedAlgorithmFactory.getInstance(preferences),
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
+    this(helixManager == null ? null
+            : constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
+                helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
         // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
         // Mapping calculator will translate the best possible assignment into the applicable state
         // mapping based on the current states.
         // TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
         new DelayedAutoRebalancer(),
         // Helix Manager is required for the rebalancer scheduler
-        helixManager, metricCollector);
+        helixManager,
+        // If HelixManager is null, we just pass in null for MetricCollector so that a
+        // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
+        // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
+        // in this case, WagedRebalancer will not read/write to metadata store and just use
+        // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
+        // verifying whether the cluster has converged.
+        helixManager == null ? null
+            : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
+    _preference = ImmutableMap.copyOf(preference);
   }
 
   /**
    * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
-   * the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
+   * the rebalancer will not schedule for a future delayed rebalance. With null MetricCollector, the
    * rebalancer will not emit JMX metrics.
    * @param assignmentMetadataStore
    * @param algorithm
-   * @param mappingCalculator
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
-      RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
-    this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
+      RebalanceAlgorithm algorithm) {
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
   }
 
   /**
-   * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
-   * the rebalancer will rebalance solely based on CurrentStates.
+   * This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
+   * not schedule for a future delayed rebalance.
    * @param assignmentMetadataStore
    * @param algorithm
    * @param metricCollector
@@ -149,11 +162,25 @@ public class WagedRebalancer {
     _changeDetector = new ResourceChangeDetector(true);
   }
 
+  // Update the rebalancer preference configuration if the new preference is different from the
+  // current preference configuration.
+  public void updatePreference(
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
+    if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
+      // 1. if the preference was not configured during constructing, no need to update.
+      // 2. if the preference equals to the new preference, no need to update.
+      return;
+    }
+    _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
+    _preference = ImmutableMap.copyOf(newPreference);
+  }
+
   // Release all the resources.
   public void close() {
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
+    _metricCollector.unregister();
   }
 
   /**
@@ -231,9 +258,43 @@ public class WagedRebalancer {
   }
 
   // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
-  protected Map<String, IdealState> computeBestPossibleStates(
+  private Map<String, IdealState> computeBestPossibleStates(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+      final CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
+    Set<String> activeNodes = DelayedRebalanceUtil
+        .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
+            clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+            clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+
+    // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
+    delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
+
+    Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
+        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput));
+
+    // The additional rebalance overwrite is required since the calculated mapping may contain
+    // some delayed rebalanced assignments.
+    if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+      applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
+              resourceMap.keySet()));
+    }
+    // Replace the assignment if user-defined preference list is configured.
+    // Note the user-defined list is intentionally applied to the final mapping after calculation.
+    // This is to avoid persisting it into the assignment store, which impacts the long term
+    // assignment evenness and partition movements.
+    newIdealStates.entrySet().stream().forEach(idealStateEntry -> applyUserDefinedPreferenceList(
+        clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
+
+    return newIdealStates;
+  }
+
+  // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+  protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
     getChangeDetector().updateSnapshots(clusterData);
     // Get all the changed items' information
     Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
@@ -257,37 +318,11 @@ public class WagedRebalancer {
       refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
     }
 
-    Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
-        clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
-        clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
-        clusterData.getClusterConfig());
-
-    // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
-    delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
-
     // Perform partial rebalance
     Map<String, ResourceAssignment> newAssignment =
         partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);
 
-    Map<String, IdealState> finalIdealStateMap =
-        convertResourceAssignment(clusterData, newAssignment);
-
-    // The additional rebalance overwrite is required since the calculated mapping may contains
-    // some delayed rebalanced assignments.
-    if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
-      applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
-              resourceMap.keySet()));
-    }
-    // Replace the assignment if user-defined preference list is configured.
-    // Note the user-defined list is intentionally applied to the final mapping after calculation.
-    // This is to avoid persisting it into the assignment store, which impacts the long term
-    // assignment evenness and partition movements.
-    finalIdealStateMap.entrySet().stream()
-        .forEach(idealStateEntry -> applyUserDefinedPreferenceList(
-            clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
-
-    return finalIdealStateMap;
+    return newAssignment;
   }
 
   /**
@@ -503,7 +538,7 @@ public class WagedRebalancer {
     Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
       IdealState is = clusterData.getIdealState(resourceEntry.getKey());
       return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
-          || !getClass().getName().equals(is.getRebalancerClassName());
+          || !WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
     }).map(Map.Entry::getKey).collect(Collectors.toSet());
     if (!nonCompatibleResources.isEmpty()) {
       throw new HelixRebalanceException(String.format(
@@ -554,7 +589,7 @@ public class WagedRebalancer {
    *         assignmentMetadataStore, return the current state assignment.
    * @throws HelixRebalanceException
    */
-  private Map<String, ResourceAssignment> getBestPossibleAssignment(
+  protected Map<String, ResourceAssignment> getBestPossibleAssignment(
       AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
       Set<String> resources) throws HelixRebalanceException {
     Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
@@ -614,18 +649,16 @@ public class WagedRebalancer {
    * @param idealStateMap the calculated ideal states.
    * @param clusterData the cluster data cache.
    * @param resourceMap the rebalanaced resource map.
-   * @param clusterChanges the detected cluster changes that triggeres the rebalance.
    * @param baseline the baseline assignment
    */
   private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
       Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
     Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
     // Note that the calculation used the baseline as the input only. This is for minimizing
     // unnecessary partition movement.
     Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
-        calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
+        calculateAssignment(clusterData, Collections.emptyMap(), resourceMap, enabledLiveInstances,
             Collections.emptyMap(), baseline));
     for (String resourceName : idealStateMap.keySet()) {
       // The new calculated ideal state before overwrite
@@ -664,6 +697,10 @@ public class WagedRebalancer {
     }
   }
 
+  protected AssignmentMetadataStore getAssignmentMetadataStore() {
+    return _assignmentMetadataStore;
+  }
+
   protected MetricCollector getMetricCollector() {
     return _metricCollector;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fcf89b5..671604e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
-import javax.management.JMException;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
@@ -43,8 +42,6 @@ import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
-import org.apache.helix.monitoring.metrics.MetricCollector;
-import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -55,6 +52,8 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.monitoring.mbeans.ResourceMonitor;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
@@ -67,12 +66,12 @@ import org.slf4j.LoggerFactory;
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger logger =
       LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
-  // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread
-  // only.
-  private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL =
-      new ThreadLocal<>();
-  private static final ThreadLocal<WagedRebalancer> WAGED_REBALANCER_THREAD_LOCAL =
-      new ThreadLocal<>();
+
+  // Lazy initialize the WAGED rebalancer instance since the BestPossibleStateCalcStage instance was
+  // instantiated without the HelixManager information that is required.
+  // TODO: Initialize the WAGED rebalancer in the BestPossibleStateCalcStage constructor once it is
+  // TODO: updated so as to accept a HelixManager or HelixZkClient information.
+  private WagedRebalancer _wagedRebalancer = null;
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -113,6 +112,29 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     });
   }
 
+  // Need to keep a default constructor for backward compatibility
+  public BestPossibleStateCalcStage() {
+  }
+
+  // Construct the BestPossibleStateCalcStage with a given WAGED rebalancer for the callers other
+  // than the controller pipeline. Such as the verifiers and test cases.
+  public BestPossibleStateCalcStage(WagedRebalancer wagedRebalancer) {
+    _wagedRebalancer = wagedRebalancer;
+  }
+
+  private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+    // Create WagedRebalancer instance if it hasn't been already initialized
+    if (_wagedRebalancer == null) {
+      _wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+    } else {
+      // Since the preference can be updated at runtime, try to update the algorithm preference
+      // before returning the rebalancer.
+      _wagedRebalancer.updatePreference(preferences);
+    }
+    return _wagedRebalancer;
+  }
+
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
       CurrentStateOutput currentStateOutput) {
     ResourceControllerDataProvider cache =
@@ -261,35 +283,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
-    // Init rebalancer with the rebalance preferences.
-    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
-        cache.getClusterConfig().getGlobalRebalancePreference();
-
-    // Create MetricCollector ThreadLocal if it hasn't been already initialized
-    if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
-      try {
-        // If HelixManager is null, we just pass in null for MetricCollector so that a
-        // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
-        // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
-        // in this case, WagedRebalancer will not read/write to metadata store and just use
-        // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
-        // verifying whether the cluster has converged.
-        METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
-            : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
-      } catch (JMException e) {
-        LogUtil.logWarn(logger, _eventId, String.format(
-            "MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
-            e));
-      }
-    }
-
-    // Create WagedRebalancer ThreadLocal if it hasn't been already initialized
-    if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
-      WAGED_REBALANCER_THREAD_LOCAL
-          .set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()));
-    }
-    WagedRebalancer wagedRebalancer = WAGED_REBALANCER_THREAD_LOCAL.get();
-
+    WagedRebalancer wagedRebalancer =
+        getWagedRebalancer(helixManager, cache.getClusterConfig().getGlobalRebalancePreference());
     try {
       newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
           currentStateOutput));
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
index 764557a..b08a840 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
@@ -27,7 +27,6 @@ import javax.management.JMException;
 import javax.management.ObjectName;
 import org.apache.helix.HelixException;
 import org.apache.helix.monitoring.metrics.model.Metric;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 
@@ -71,7 +70,7 @@ public abstract class MetricCollector extends DynamicMBeanProvider {
 
   @Override
   public String getSensorName() {
-    return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
+    return String.format("%s.%s.%s", _monitorDomainName, _clusterName,
         _entityName);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index 3dd16ad..df8b60f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.metrics;
 
 import javax.management.JMException;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
 import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
@@ -63,10 +64,17 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
     PartialRebalanceCounter
   }
 
-  public WagedRebalancerMetricCollector(String clusterName) throws JMException {
+  public WagedRebalancerMetricCollector(String clusterName) {
     super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
     createMetrics();
-    register();
+    if (clusterName != null) {
+      try {
+        register();
+      } catch (JMException e) {
+        throw new HelixException("Failed to register MBean for the WagedRebalancerMetricCollector.",
+            e);
+      }
+    }
   }
 
   /**
@@ -75,8 +83,7 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
    * metrics.
    */
   public WagedRebalancerMetricCollector() {
-    super(MonitorDomainNames.Rebalancer.name(), null, null);
-    createMetrics();
+    this(null);
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 52ced19..4118ccf 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -19,40 +19,48 @@ package org.apache.helix.tools.ClusterVerifiers;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * verifier that the ExternalViews of given resources (or all resources in the cluster)
  * match its best possible mapping states.
@@ -377,8 +385,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     }
 
     runStage(event, new CurrentStateComputationStage());
-    // TODO: be caution here, should be handled statelessly.
-    runStage(event, new BestPossibleStateCalcStage());
+    // Note the dryrunWagedRebalancer is just for one time usage
+    DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+        cache.getClusterConfig().getGlobalRebalancePreference());
+    try {
+      // TODO: be caution here, should be handled statelessly.
+      runStage(event, new BestPossibleStateCalcStage(dryrunWagedRebalancer));
+    } finally {
+      dryrunWagedRebalancer.close();
+    }
 
     BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     return output;
@@ -399,3 +414,44 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
 }
+
+/**
+ * A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
+ * never update the rebalancer assignment metadata.
+ * This rebalacer is used in the verifiers or tests.
+ */
+class DryrunWagedRebalancer extends WagedRebalancer {
+  DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+    super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
+        ConstraintBasedAlgorithmFactory.getInstance(preferences));
+  }
+
+  @Override
+  protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
+    return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+        resourceMap.keySet());
+  }
+}
+
+class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
+  ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+    super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+  }
+
+  @Override
+  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+    // Update the in-memory reference only
+    _globalBaseline = globalBaseline;
+  }
+
+  @Override
+  public void persistBestPossibleAssignment(
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Update the in-memory reference only
+    _bestPossibleAssignment = bestPossibleAssignment;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 9782645..2250539 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -115,10 +114,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test
-  public void testRebalance() throws IOException, HelixRebalanceException {
+  public void testRebalance()
+      throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -140,8 +139,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance")
   public void testPartialRebalance() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -166,8 +164,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance")
   public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -224,10 +221,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
-  public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+  public void testNonCompatibleConfiguration()
+      throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String nonCompatibleResourceName = _resourceNames.get(0);
@@ -248,8 +245,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance")
   public void testInvalidClusterStatus() throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String invalidResource = _resourceNames.get(0);
@@ -259,7 +255,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
         Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
     try {
-      rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+      rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
@@ -280,8 +277,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
     when(metadataStore.getBaseline())
         .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     // The input resource Map shall contain all the valid resources.
@@ -299,10 +295,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   }
 
   @Test(dependsOnMethods = "testRebalance")
-  public void testAlgorithmException() throws IOException, HelixRebalanceException {
+  public void testAlgorithmException()
+      throws IOException, HelixRebalanceException {
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
@@ -320,11 +316,12 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
     when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
         HelixRebalanceException.Type.FAILED_TO_CALCULATE));
-    rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer());
+    rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
 
     // Calculation will fail
     try {
-      rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+      rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -350,8 +347,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
     // won't propagate any existing assignment from the cluster model.
     _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer =
-        new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
 
     // 1. rebalance with baseline calculation done
     // Generate the input for the rebalancer.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index 83229a1..7b792a2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -126,7 +126,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
     // Add a field to the cluster config so the cluster config will be marked as changed in the change detector.
     clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
 
-    rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
 
     Assert.assertEquals((long) metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 7d4965e..afcabdf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -82,7 +83,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier = getClusterVerifier();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
   }
@@ -254,10 +256,6 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
     }
   }
 
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
-  }
-
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
     Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
index f85f07f..d8840f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
  */
 
 import java.util.Date;
+
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -58,7 +59,8 @@ public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebala
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier = getClusterVerifier();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 83893c6..7ece391 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -30,14 +30,14 @@ import org.testng.annotations.Test;
 
 
 public class TestExpandCluster extends TestPartitionMigrationBase {
-
   Map<String, IdealState> _resourceMap;
 
-
   @BeforeClass
   public void beforeClass() throws Exception {
     super.beforeClass();
     _resourceMap = createTestDBs(1000000);
+    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+    Thread.sleep(1000);
     _migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
index 61d72a2..168cb6c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -84,20 +85,17 @@ public class TestPartitionMigrationBase extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier = getVerifier();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
 
     enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
 
-    _manager =
-        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
     _manager.connect();
     _configAccessor = new ConfigAccessor(_gZkClient);
   }
 
-  protected ZkHelixClusterVerifier getVerifier() {
-    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
-  }
-
   protected MockParticipantManager createAndStartParticipant(String instancename) {
     _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
index 01db9eb..37e76ee 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
@@ -20,27 +20,13 @@ package org.apache.helix.integration.rebalancer.PartitionMigration;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 
-public class TestWagedExpandCluster extends TestExpandCluster {
-// TODO check the movements in between
-  protected ZkHelixClusterVerifier getVerifier() {
-    Set<String> dbNames = new HashSet<>();
-    int i = 0;
-    for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + i++);
-    }
-    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
-        .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
-  }
 
+public class TestWagedExpandCluster extends TestExpandCluster {
   protected Map<String, IdealState> createTestDBs(long delayTime) {
     Map<String, IdealState> idealStateMap = new HashMap<>();
     int i = 0;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
index 3bfdc37..52def54 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
@@ -28,7 +28,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -39,7 +39,8 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
   ConfigAccessor _configAccessor;
 
   @BeforeClass
-  public void beforeClass() throws Exception {
+  public void beforeClass()
+      throws Exception {
     super.beforeClass();
     _configAccessor = new ConfigAccessor(_gZkClient);
   }
@@ -58,7 +59,8 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
   // TODO check the movements in between
   @Test(dataProvider = "stateModels")
   public void testMigrateToWagedRebalancerWhileExpandCluster(String stateModel,
-      boolean delayEnabled) throws Exception {
+      boolean delayEnabled)
+      throws Exception {
     String db = "Test-DB-" + stateModel;
     if (delayEnabled) {
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
@@ -84,9 +86,7 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
     }
     Thread.sleep(2000);
     ZkHelixClusterVerifier clusterVerifier =
-        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
-            .setResources(Collections.singleton(db)).setZkAddr(ZK_ADDR)
-            .setDeactivatedNodeAwareness(true).build();
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(clusterVerifier.verifyByPolling());
 
     _migrationVerifier =
@@ -95,9 +95,11 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
     _migrationVerifier.reset();
     _migrationVerifier.start();
 
-    IdealState currentIdealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    IdealState currentIdealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
     currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
-    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
+    _gSetupTool.getClusterManagementTool()
+        .setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
     Thread.sleep(2000);
     Assert.assertTrue(clusterVerifier.verifyByPolling());
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index 2f13d8b..f4c875f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -90,7 +90,8 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier = getClusterVerifier();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
@@ -110,10 +111,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     };
   }
 
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
-  }
-
   protected void createResource(String stateModel, int numPartition, int replica,
       boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
@@ -148,6 +145,9 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
         new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
     _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
 
+    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+    Thread.sleep(500);
+
     Assert.assertTrue(_clusterVerifier.verify(3000));
     verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
 
@@ -263,7 +263,7 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
   @AfterMethod
   public void afterMethod() {
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
-    getClusterVerifier().verify(5000);
+    _clusterVerifier.verify(5000);
   }
 
   @AfterClass
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index 58d0dce..bd3f2e1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -40,7 +40,6 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -55,6 +54,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
   private List<MockParticipantManager> _participants = new ArrayList<>();
+  private ZkHelixClusterVerifier _clusterVerifier;
   private boolean _testSuccess = true;
   private boolean _startListen = false;
 
@@ -76,6 +76,9 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
     String controllerName = CONTROLLER_PREFIX + "_0";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   @AfterMethod
@@ -122,9 +125,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica,
           0);
     }
-    ZkHelixClusterVerifier clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
-    Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+    Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
 
     _startListen = true;
     DelayedTransition.setDelay(5);
@@ -133,7 +134,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
     for (; i < NUM_NODE; i++) {
       _participants.get(i).syncStart();
     }
-    Assert.assertTrue(clusterVerifier.verify(70000L));
+    Assert.assertTrue(_clusterVerifier.verify(70000L));
     Assert.assertTrue(_testSuccess);
 
     if (manager.isConnected()) {
@@ -164,10 +165,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
       String db = "Test-DB-" + stateModel;
       createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica);
     }
-    ZkHelixClusterVerifier clusterVerifier =
-        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setDeactivatedNodeAwareness(true).build();
-    Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+    Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
 
     _startListen = true;
     DelayedTransition.setDelay(5);
@@ -176,7 +174,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
     for (; i < NUM_NODE; i++) {
       _participants.get(i).syncStart();
     }
-    Assert.assertTrue(clusterVerifier.verify(70000L));
+    Assert.assertTrue(_clusterVerifier.verify(70000L));
     Assert.assertTrue(_testSuccess);
 
     if (manager.isConnected()) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
index e49cc19..e75da84 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -20,15 +20,11 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -36,16 +32,6 @@ import org.testng.annotations.Test;
  * Inherit TestDelayedAutoRebalance to ensure the test logic is the same.
  */
 public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance {
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    Set<String> dbNames = new HashSet<>();
-    int i = 0;
-    for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
-    }
-    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
-        .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
-  }
-
   // create test DBs, wait it converged and return externalviews
   protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
     Map<String, ExternalView> externalViews = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
index 3d4bd6a..92988c4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -20,35 +20,22 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+
 /**
  * Inherit TestDelayedAutoRebalanceWithDisabledInstance to ensure the test logic is the same.
  */
-public class TestDelayedWagedRebalanceWithDisabledInstance
-    extends TestDelayedAutoRebalanceWithDisabledInstance {
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    Set<String> dbNames = new HashSet<>();
-    int i = 0;
-    for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
-    }
-    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
-        .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
-  }
-
+public class TestDelayedWagedRebalanceWithDisabledInstance extends TestDelayedAutoRebalanceWithDisabledInstance {
   // create test DBs, wait it converged and return externalviews
-  protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+  protected Map<String, ExternalView> createTestDBs(long delayTime)
+      throws InterruptedException {
     Map<String, ExternalView> externalViews = new HashMap<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
@@ -77,28 +64,33 @@ public class TestDelayedWagedRebalanceWithDisabledInstance
     // Waged Rebalancer takes cluster level delay config only. Skip this test.
   }
 
-  @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
-  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testDelayedPartitionMovementWithClusterConfigedDelay()
+      throws Exception {
     super.testDelayedPartitionMovementWithClusterConfigedDelay();
   }
 
-  @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
-  public void testMinimalActiveReplicaMaintain() throws Exception {
+  @Test(dependsOnMethods = {"testDelayedPartitionMovementWithClusterConfigedDelay"})
+  public void testMinimalActiveReplicaMaintain()
+      throws Exception {
     super.testMinimalActiveReplicaMaintain();
   }
 
-  @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
-  public void testPartitionMovementAfterDelayTime() throws Exception {
+  @Test(dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testPartitionMovementAfterDelayTime()
+      throws Exception {
     super.testPartitionMovementAfterDelayTime();
   }
 
-  @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
-  public void testDisableDelayRebalanceInCluster() throws Exception {
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+  public void testDisableDelayRebalanceInCluster()
+      throws Exception {
     super.testDisableDelayRebalanceInCluster();
   }
 
-  @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
-  public void testDisableDelayRebalanceInInstance() throws Exception {
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+  public void testDisableDelayRebalanceInInstance()
+      throws Exception {
     super.testDisableDelayRebalanceInInstance();
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
index bb7c11a..cd1f337 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
@@ -20,34 +20,22 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  */
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+
 /**
  * Inherit TestDelayedAutoRebalanceWithRackaware to ensure the test logic is the same.
  */
 public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebalanceWithRackaware {
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    Set<String> dbNames = new HashSet<>();
-    int i = 0;
-    for (String stateModel : TestStateModels) {
-      dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
-    }
-    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
-        .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
-  }
-
   // create test DBs, wait it converged and return externalviews
-  protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+  protected Map<String, ExternalView> createTestDBs(long delayTime)
+      throws InterruptedException {
     Map<String, ExternalView> externalViews = new HashMap<>();
     int i = 0;
     for (String stateModel : TestStateModels) {
@@ -76,28 +64,33 @@ public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebal
     // Waged Rebalancer takes cluster level delay config only. Skip this test.
   }
 
-  @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
-  public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+  @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+  public void testDelayedPartitionMovementWithClusterConfigedDelay()
+      throws Exception {
     super.testDelayedPartitionMovementWithClusterConfigedDelay();
   }
 
-  @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
-  public void testMinimalActiveReplicaMaintain() throws Exception {
+  @Test(dependsOnMethods = {"testDelayedPartitionMovementWithClusterConfigedDelay"})
+  public void testMinimalActiveReplicaMaintain()
+      throws Exception {
     super.testMinimalActiveReplicaMaintain();
   }
 
-  @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
-  public void testPartitionMovementAfterDelayTime() throws Exception {
+  @Test(dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+  public void testPartitionMovementAfterDelayTime()
+      throws Exception {
     super.testPartitionMovementAfterDelayTime();
   }
 
-  @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
-  public void testDisableDelayRebalanceInCluster() throws Exception {
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+  public void testDisableDelayRebalanceInCluster()
+      throws Exception {
     super.testDisableDelayRebalanceInCluster();
   }
 
-  @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
-  public void testDisableDelayRebalanceInInstance() throws Exception {
+  @Test(dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+  public void testDisableDelayRebalanceInInstance()
+      throws Exception {
     super.testDisableDelayRebalanceInInstance();
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
index 00a31e2..eb9e0f8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -19,12 +19,8 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  * under the License.
  */
 
-import java.util.Collections;
-
 import org.apache.helix.integration.rebalancer.TestMixedModeAutoRebalance;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 
@@ -43,11 +39,6 @@ public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance {
     };
   }
 
-  protected ZkHelixClusterVerifier getClusterVerifier() {
-    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        .setDeactivatedNodeAwareness(true).setResources(Collections.singleton(DB_NAME)).build();
-  }
-
   protected void createResource(String stateModel, int numPartition,
       int replica, boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
similarity index 95%
rename from helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
rename to helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index a87be34..369d46a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -36,14 +36,14 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestNodeSwap extends ZkTestBase {
+public class TestWagedNodeSwap extends ZkTestBase {
   final int NUM_NODE = 6;
   protected static final int START_PORT = 12918;
   protected static final int _PARTITIONS = 20;
@@ -122,8 +122,8 @@ public class TestNodeSwap extends ZkTestBase {
     }
     Thread.sleep(1000);
 
-    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
   }
 
@@ -183,8 +183,6 @@ public class TestNodeSwap extends ZkTestBase {
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
 
     Thread.sleep(2000);
-    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
 
     // Since only one node temporary down, the same partitions will be moved to the newly added node.
@@ -271,8 +269,6 @@ public class TestNodeSwap extends ZkTestBase {
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
 
     Thread.sleep(2000);
-    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
 
     for (String db : _allDBs) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 9790b92..11214ce 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -410,6 +411,67 @@ public class TestWagedRebalance extends ZkTestBase {
     }
   }
 
+  @Test(dependsOnMethods = "test")
+  public void testNewInstances()
+      throws InterruptedException {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setGlobalRebalancePreference(ImmutableMap.of(
+        ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0, ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(_replica);
+
+    String newNodeName = "newNode-" + TestHelper.getTestMethodName() + "_" + START_PORT;
+    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+    try {
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newNodeName);
+      participant.syncStart();
+
+      Thread.sleep(300);
+      validate(_replica);
+
+      Assert.assertFalse(_allDBs.stream().anyMatch(db -> {
+        ExternalView ev =
+            _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+        for (String partition : ev.getPartitionSet()) {
+          if (ev.getStateMap(partition).containsKey(newNodeName)) {
+            return true;
+          }
+        }
+        return false;
+      }));
+
+      clusterConfig.setGlobalRebalancePreference(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+      Thread.sleep(300);
+      validate(_replica);
+
+      Assert.assertTrue(_allDBs.stream().anyMatch(db -> {
+        ExternalView ev =
+            _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+        for (String partition : ev.getPartitionSet()) {
+          if (ev.getStateMap(partition).containsKey(newNodeName)) {
+            return true;
+          }
+        }
+        return false;
+      }));
+    } finally {
+      if (participant != null && participant.isConnected()) {
+        participant.syncStop();
+      }
+    }
+  }
+
   private void validate(int expectedReplica) {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
index 831f77f..904e0bc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -36,7 +36,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -304,8 +304,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
 
   private void validate(int expectedReplica) {
     ZkHelixClusterVerifier _clusterVerifier =
-        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     for (String db : _allDBs) {
@@ -350,8 +349,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
     // waiting for all DB be dropped.
     Thread.sleep(100);
     ZkHelixClusterVerifier _clusterVerifier =
-        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }