You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/04/25 20:38:05 UTC

[helix] branch master updated: WAGED rebalance overwrite redesign -- part 2 (#2447)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 06401d4b4 WAGED rebalance overwrite redesign -- part 2 (#2447)
06401d4b4 is described below

commit 06401d4b43055dc244446dd0430707de0fbe651c
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Tue Apr 25 16:37:59 2023 -0400

    WAGED rebalance overwrite redesign -- part 2 (#2447)
    
    Integrates the redesigned minActiveReplica handling logic into WAGED rebalancer.
---
 .../rebalancer/util/DelayedRebalanceUtil.java      |  46 +++++-
 .../rebalancer/waged/WagedRebalancer.java          | 150 ++++++++++----------
 .../java/org/apache/helix/util/RebalanceUtil.java  |   8 +-
 .../rebalancer/waged/TestWagedRebalancer.java      | 155 ++++++++++++++++-----
 .../waged/model/AbstractTestClusterModel.java      |   5 +-
 .../waged/model/TestClusterModelProvider.java      |   9 --
 6 files changed, 244 insertions(+), 129 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 65b1431e4..61985223c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -311,17 +311,18 @@ public class DelayedRebalanceUtil {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
-      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);
+
       // keep all current assignment and add to allocated replicas
       resourceAssignment.getMappedPartitions().forEach(partition ->
           resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
               allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
-                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
                       partition.getPartitionName(), state, statePriorityMap.get(state)))));
       // only proceed for resource requiring delayed rebalance overwrites
       List<String> partitions =
@@ -329,12 +330,44 @@ public class DelayedRebalanceUtil {
       if (partitions.isEmpty()) {
         continue;
       }
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(resourceAssignment);
       toBeAssignedReplicas.addAll(
           findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
     }
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * from newAssignment to currentResourceAssignment
+   * @param newAssignment newAssignment to merge and may override currentResourceAssignment
+   * @param currentResourceAssignment the current resource assignment, this map is getting updated during this method.
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment) {
+    newAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment assignment = entry.getValue();
+      if (!currentResourceAssignment.containsKey(resourceName)) {
+        currentResourceAssignment.put(resourceName, assignment);
+      } else {
+        for (Partition partition : assignment.getMappedPartitions()) {
+          Map<String, String> toMerge =
+              new HashMap<>(currentResourceAssignment.get(resourceName).getReplicaMap(partition));
+          assignment.getReplicaMap(partition).forEach((key, value) -> {
+            toMerge.put(key, value);
+            currentResourceAssignment.get(resourceName).addReplicaMap(partition, toMerge);
+          });
+        }
+      }
+    });
+  }
+
   /**
    * From the current assignment, find the partitions that are missing minActiveReplica for ALL resources, return as a
    * map keyed by resource name.
@@ -411,7 +444,8 @@ public class DelayedRebalanceUtil {
         clusterData.getStateModelDef(clusterData.getIdealState(resourceName).getStateModelDefRef())
             .getStatesPriorityList();
     final IdealState currentIdealState = clusterData.getIdealState(resourceName);
-    final ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
+    final ResourceConfig resourceConfig = ResourceConfig.mergeIdealStateWithResourceConfig(
+        clusterData.getResourceConfig(resourceName), currentIdealState);
     final Map<String, Integer> statePriorityMap =
         clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
     final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 494baef01..6c1c4d74d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -312,13 +312,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
     Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData, newBestPossibleAssignment);
 
-    // The additional rebalance overwrite is required since the calculated mapping may contain
-    // some delayed rebalanced assignments.
-    if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
-        newBestPossibleAssignment)) {
-      applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
-          _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
-    }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final mapping after calculation.
     // This is to avoid persisting it into the assignment store, which impacts the long term
@@ -339,10 +332,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     // Perform global rebalance for a new baseline assignment
     _globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
     // Perform emergency rebalance for a new best possible assignment
-    Map<String, ResourceAssignment> newAssignment =
-        emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
-
-    return newAssignment;
+    return emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
   }
 
   /**
@@ -383,6 +373,69 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     return FAILURE_TYPES_TO_PROPAGATE;
   }
 
+  /**
+   * Some partition may fail to meet minActiveReplica due to delayed rebalance, because some instances are offline yet
+   * active. In this case, additional replicas have to be brought up -- until either the instance gets back, or timeout,
+   * at which we have a more permanent resolution.
+   * The term "overwrite" is inherited from historical approach, however, it's no longer technically an overwrite.
+   * It's a formal rebalance process that goes through the algorithm and all constraints.
+   * @param clusterData Cluster data cache
+   * @param resourceMap The map of resource to calculate
+   * @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
+   *                    delayed rebalance config
+   * @param currentResourceAssignment The current resource assignment or the best possible assignment computed from last
+   *                           emergency rebalance.
+   * @param algorithm The rebalance algorithm
+   * @return The resource assignment with delayed rebalance minActiveReplica
+   */
+  private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap,
+      Set<String> activeNodes,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+    // the "real" live nodes at the time
+    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
+      // no need for additional process, return the current resource assignment
+      return currentResourceAssignment;
+    }
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+    LOG.info("Start delayed rebalance overwrites in emergency rebalance.");
+    try {
+      // use the "real" live and enabled instances for calculation
+      ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(
+          clusterData, resourceMap, enabledLiveInstances, currentResourceAssignment);
+      Map<String, ResourceAssignment> assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+      // keep only the resource entries requiring changes for minActiveReplica
+      assignment.keySet().retainAll(clusterModel.getAssignableReplicaMap().keySet());
+      DelayedRebalanceUtil.mergeAssignments(assignment, currentResourceAssignment);
+      return currentResourceAssignment;
+    } catch (HelixRebalanceException e) {
+      LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
+      throw new HelixRebalanceException("Failed to compute for delayed rebalance overwrites in cluster "
+          + clusterData.getClusterConfig(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, e);
+    } finally {
+      _rebalanceOverwriteLatency.endMeasuringLatency();
+    }
+  }
+
+  /**
+   * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
+   * and addressing for partitions failing to meet minActiveReplicas.
+   * The scope of the computation here should be limited to handling urgency only and shouldn't be blocking.
+   * @param clusterData Cluster data cache
+   * @param resourceMap resource map
+   * @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
+   *                    delayed rebalance config
+   * @param currentStateOutput Current state output from pipeline
+   * @param algorithm The rebalance algorithm
+   * @return The new resource assignment
+   */
   protected Map<String, ResourceAssignment> emergencyRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
@@ -430,6 +483,17 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+      assignmentWithDelayedRebalanceAdjust =
+          handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
+    }
+
     _emergencyRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish emergency rebalance");
 
@@ -438,9 +502,12 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
           resourceMap.keySet());
       persistBestPossibleAssignment(newAssignment);
+      // delayed rebalance handling result is temporary, shouldn't be persisted
+      assignmentWithDelayedRebalanceAdjust =
+          handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
     }
 
-    return newAssignment;
+    return assignmentWithDelayedRebalanceAdjust;
   }
 
   // Generate the preference lists from the state mapping based on state priority.
@@ -563,65 +630,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     return !allMinActiveReplicaMet.get();
   }
 
-  /**
-   * Update the rebalanced ideal states according to the real active nodes.
-   * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
-   * might include inactive nodes.
-   * This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
-   * @param idealStateMap the calculated ideal states.
-   * @param clusterData the cluster data cache.
-   * @param resourceMap the rebalanaced resource map.
-   * @param baseline the baseline assignment.
-   * @param algorithm the rebalance algorithm.
-   */
-  protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
-      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
-      throws HelixRebalanceException {
-    _rebalanceOverwriteCounter.increment(1L);
-    _rebalanceOverwriteLatency.startMeasuringLatency();
-
-    ClusterModel clusterModel;
-    try {
-      // Note this calculation uses the baseline as the best possible assignment input here.
-      // This is for minimizing unnecessary partition movement.
-      clusterModel = ClusterModelProvider
-          .generateClusterModelFromExistingAssignment(clusterData, resourceMap, baseline);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException(
-          "Failed to generate cluster model for delayed rebalance overwrite.",
-          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-    }
-    Map<String, IdealState> activeIdealStates =
-        convertResourceAssignment(clusterData, WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm));
-    for (String resourceName : idealStateMap.keySet()) {
-      // The new calculated ideal state before overwrite
-      IdealState newIdealState = idealStateMap.get(resourceName);
-      if (!activeIdealStates.containsKey(resourceName)) {
-        throw new HelixRebalanceException(
-            "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
-                + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-      }
-      // The ideal state that is calculated based on the real alive/enabled instances list
-      IdealState newActiveIdealState = activeIdealStates.get(resourceName);
-      // The current ideal state that exists in the IdealState znode
-      IdealState currentIdealState = clusterData.getIdealState(resourceName);
-      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
-      int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
-      int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
-          .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
-              currentIdealState), currentIdealState, numReplica);
-      Map<String, List<String>> finalPreferenceLists =
-          DelayedRebalanceUtil.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
-              newIdealState.getPreferenceLists(), enabledLiveInstances,
-              Math.min(minActiveReplica, numReplica));
-
-      newIdealState.setPreferenceLists(finalPreferenceLists);
-
-      _rebalanceOverwriteLatency.endMeasuringLatency();
-    }
-  }
-
   private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
       IdealState idealState) {
     if (resourceConfig != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index bc4054a52..868e0cf57 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -177,12 +177,8 @@ public class RebalanceUtil {
 
   public static void scheduleOnDemandPipeline(String clusterName, long delay,
       boolean shouldRefreshCache) {
-    if (clusterName == null) {
-      LOG.error("Failed to issue a pipeline run. ClusterName is null.");
-      return;
-    }
-    if (delay < 0L) {
-      LOG.error("Failed to issue a pipeline run. Delay is invalid.");
+    if (clusterName == null || delay < 0L) {
+      LOG.warn("Invalid input: [clusterName: {}, delay: {}], skip the pipeline issuing.", clusterName, delay);
       return;
     }
     GenericHelixController leaderController =
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 3344fe14c..a34c92298 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,6 +50,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -58,22 +60,19 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestWagedRebalancer extends AbstractTestClusterModel {
-  private Set<String> _instances;
   private MockRebalanceAlgorithm _algorithm;
   private MockAssignmentMetadataStore _metadataStore;
 
   @BeforeClass
   public void initialize() {
     super.initialize();
-    _instances = new HashSet<>();
-    _instances.add(_testInstanceId);
     _algorithm = new MockRebalanceAlgorithm();
 
     // Initialize a mock assignment metadata store
@@ -93,7 +92,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       is.setStateModelDefRef("MasterSlave");
       is.setReplicas("3");
       is.setRebalancerClassName(WagedRebalancer.class.getName());
-      _partitionNames.stream()
+      _partitionNames
           .forEach(partition -> is.setPreferenceList(partition, Collections.emptyList()));
       isMap.put(resource, is);
     }
@@ -131,10 +130,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Mocking the change types for triggering a baseline rebalance.
@@ -169,10 +167,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Mocking the change types for triggering a baseline rebalance.
@@ -197,10 +194,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Mocking the change types for triggering a baseline rebalance.
@@ -285,10 +281,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Mocking the change types for triggering a baseline rebalance.
@@ -347,10 +342,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         .setRebalancerClassName(CrushRebalanceStrategy.class.getName());
     // The input resource Map shall contain all the valid resources.
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
@@ -368,7 +362,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     clusterData.getIdealState(invalidResource).setStateModelDefRef("foobar");
     // The input resource Map shall contain all the valid resources.
     Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
-        Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+        Collectors.toMap(resourceName -> resourceName, Resource::new));
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
           clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
@@ -397,7 +391,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     // The input resource Map shall contain all the valid resources.
     Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
-        Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
+        Collectors.toMap(resourceName -> resourceName, Resource::new));
     try {
       rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
       Assert.fail("Rebalance shall fail.");
@@ -417,10 +411,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Rebalance with normal configuration. So the assignment will be persisted in the metadata store.
@@ -450,7 +443,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Ensure failure has been recorded
     Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
-        CountMetric.class).getValue().longValue(), 1l);
+        CountMetric.class).getValue().longValue(), 1L);
   }
 
   @Test(dependsOnMethods = "testRebalance")
@@ -479,10 +472,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     clusterData.setClusterConfig(clusterConfig);
 
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
 
@@ -686,7 +678,12 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Populate best possible assignment
     rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
     verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
-    verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+    Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
+        CountMetric.class).getValue().longValue(), 0L);
+    Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+        LatencyMetric.class).getLastEmittedMetricValue().longValue(), -1L);
 
     // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
     for (String resource : _resourceNames) {
@@ -708,7 +705,94 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     clusterData.setClusterConfig(clusterConfig);
     rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
     verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
-    verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+    Assert.assertEquals(rebalancer.getMetricCollector().getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
+        CountMetric.class).getValue().longValue(), 1L);
+    Assert.assertTrue(rebalancer.getMetricCollector()
+        .getMetric(WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+            LatencyMetric.class).getLastEmittedMetricValue() > 0L);
+  }
+
+  @Test(dependsOnMethods = "testRebalanceOverwriteTrigger")
+  public void testRebalanceOverwrite() throws HelixRebalanceException, IOException {
+    _metadataStore.reset();
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // Enable delay rebalance
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1);
+    clusterData.setClusterConfig(clusterConfig);
+
+    String instance0 = _testInstanceId;
+    String instance1 = instance0  + "1";
+    String instance2 = instance0 + "2";
+    String offlineInstance = "offlineInstance";
+
+    // force create a fake offlineInstance that's in delay window
+    Set<String> instances = new HashSet<>(_instances);
+    instances.add(offlineInstance);
+    when(clusterData.getAllInstances()).thenReturn(instances);
+    when(clusterData.getEnabledInstances()).thenReturn(instances);
+    when(clusterData.getEnabledLiveInstances()).thenReturn(Set.of(instance0, instance1, instance2));
+    Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+    instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
+    when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(2);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    MockRebalanceAlgorithm algorithm = new MockRebalanceAlgorithm();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, algorithm, Optional.empty());
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+
+    Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
+        _resourceNames.get(0),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+            _partitionNames.get(1), ImmutableMap.of(instance2, "MASTER", offlineInstance, "OFFLINE"), // Partition2-SLAVE
+            _partitionNames.get(2), ImmutableMap.of(instance1, "SLAVE", instance2, "MASTER"),
+            _partitionNames.get(3), ImmutableMap.of(instance1, "SLAVE", instance2, "SLAVE")),
+        _resourceNames.get(1),
+        ImmutableMap.of(
+            _partitionNames.get(0), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+            _partitionNames.get(1), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+            _partitionNames.get(2), ImmutableMap.of(instance1, "MASTER", instance2, "SLAVE"),
+            _partitionNames.get(3), ImmutableMap.of(offlineInstance, "OFFLINE", instance2, "SLAVE")) // Partition4-MASTER
+    );
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    input.forEach((resource, inputMap) ->
+        inputMap.forEach((partition, stateInstance) ->
+            stateInstance.forEach((tmpInstance, state) ->
+                currentStateOutput.setCurrentState(resource, new Partition(partition), tmpInstance, state))));
+    rebalancer.setPartialRebalanceAsyncMode(true);
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, currentStateOutput);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceLists().size(), 4);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceLists().size(), 4);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceList(_partitionNames.get(1)).size(), 3);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(0)).getPreferenceList(_partitionNames.get(3)).size(), 2);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceList(_partitionNames.get(3)).size(), 3);
+    Assert.assertEquals(newIdealStates.get(_resourceNames.get(1)).getPreferenceList(_partitionNames.get(0)).size(), 2);
   }
 
   @Test(dependsOnMethods = "testRebalance")
@@ -718,10 +802,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+        .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
           Resource resource = new Resource(entry.getKey());
-          entry.getValue().getPartitionSet().stream()
-              .forEach(partition -> resource.addPartition(partition));
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
           return resource;
         }));
     // Mocking the change types for triggering a baseline rebalance.
@@ -764,7 +847,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       IdealState is = newIdealStates.get(resourceName);
       ResourceAssignment assignment = expectedResult.get(resourceName);
       Assert.assertEquals(is.getPartitionSet(), new HashSet<>(assignment.getMappedPartitions()
-          .stream().map(partition -> partition.getPartitionName()).collect(Collectors.toSet())));
+          .stream().map(Partition::getPartitionName).collect(Collectors.toSet())));
       for (String partitionName : is.getPartitionSet()) {
         Assert.assertEquals(is.getInstanceStateMap(partitionName),
             assignment.getReplicaMap(new Partition(partitionName)));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 997232a5a..b9c4ce39d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -40,7 +40,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -52,6 +52,7 @@ public abstract class AbstractTestClusterModel {
   protected Map<String, List<String>> _disabledPartitionsMap;
   protected List<String> _testInstanceTags;
   protected String _testFaultZoneId;
+  protected Set<String> _instances;
 
   @BeforeClass
   public void initialize() {
@@ -75,6 +76,8 @@ public abstract class AbstractTestClusterModel {
     _testInstanceTags = new ArrayList<>();
     _testInstanceTags.add("TestTag");
     _testFaultZoneId = "testZone";
+    _instances = new HashSet<>();
+    _instances.add(_testInstanceId);
   }
 
   protected InstanceConfig createMockInstanceConfig(String instanceId) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index fc316c2a5..500cfb0b6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -45,23 +45,14 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
 public class TestClusterModelProvider extends AbstractTestClusterModel {
-  Set<String> _instances;
   Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
 
-  @BeforeClass
-  public void initialize() {
-    super.initialize();
-    _instances = new HashSet<>();
-    _instances.add(_testInstanceId);
-  }
-
   @Override
   protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
     ResourceControllerDataProvider testCache = super.setupClusterDataCache();