You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/06 04:54:28 UTC

[helix] branch master updated: Refactor WagedRebalancer and add comments (#2431)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e93568d6 Refactor WagedRebalancer and add comments (#2431)
0e93568d6 is described below

commit 0e93568d63d1d90b13d0e5fe43a9be5c84551025
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Apr 6 00:54:17 2023 -0400

    Refactor WagedRebalancer and add comments (#2431)
    
    Refactor WagedRebalancer and add comments
    
    Create standalone classes for partial and global rebalance to make the
    class more modular and easier to manage.
---
 .../rebalancer/util/WagedRebalanceUtil.java        |  53 +++
 .../rebalancer/waged/AssignmentManager.java        | 106 ++++++
 .../rebalancer/waged/GlobalRebalanceRunner.java    | 216 ++++++++++++
 .../rebalancer/waged/PartialRebalanceRunner.java   | 208 ++++++++++++
 .../rebalancer/waged/WagedRebalancer.java          | 376 ++-------------------
 .../rebalancer/waged/TestWagedRebalancer.java      |   5 -
 6 files changed, 610 insertions(+), 354 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
new file mode 100644
index 000000000..62a5fb515
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
@@ -0,0 +1,53 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ResourceAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class WagedRebalanceUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WagedRebalanceUtil.class);
+
+  /**
+   * @param clusterModel the cluster model that contains all the cluster status for the purpose of
+   *                     rebalancing.
+   * @return the new optimal assignment for the resources.
+   */
+  public static Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
+      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+    long startTime = System.currentTimeMillis();
+    LOG.info("Start calculating for an assignment with algorithm {}",
+        algorithm.getClass().getSimpleName());
+    OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+    Map<String, ResourceAssignment> newAssignment =
+        optimalAssignment.getOptimalResourceAssignment();
+    LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
+        algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
+    return newAssignment;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
new file mode 100644
index 000000000..475e8aad1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
@@ -0,0 +1,106 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+
+
+/**
+ * A manager class for fetching assignment from metadata store.
+ */
+class AssignmentManager {
+  private final LatencyMetric _stateReadLatency;
+
+  public AssignmentManager(LatencyMetric stateReadLatency) {
+    _stateReadLatency = stateReadLatency;
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current baseline assignment. If record does not exist in the
+   *         assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  public Map<String, ResourceAssignment> getBaselineAssignment(AssignmentMetadataStore assignmentMetadataStore,
+      CurrentStateOutput currentStateOutput, Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBaseline = new HashMap<>();
+    if (assignmentMetadataStore != null) {
+      try {
+        _stateReadLatency.startMeasuringLatency();
+        currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
+        _stateReadLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current baseline assignment because of unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    currentBaseline.keySet().retainAll(resources);
+
+    // For resources without baseline, fall back to current state  assignments
+    Set<String> missingResources = new HashSet<>(resources);
+    missingResources.removeAll(currentBaseline.keySet());
+    currentBaseline.putAll(currentStateOutput.getAssignment(missingResources));
+
+    return currentBaseline;
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current best possible assignment. If record does not exist in the
+   *         assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  public Map<String, ResourceAssignment> getBestPossibleAssignment(
+      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
+      Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBestAssignment = new HashMap<>();
+    if (assignmentMetadataStore != null) {
+      try {
+        _stateReadLatency.startMeasuringLatency();
+        currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
+        _stateReadLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current best possible assignment because of unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    currentBestAssignment.keySet().retainAll(resources);
+
+    // For resources without best possible states, fall back to current state  assignments
+    Set<String> missingResources = new HashSet<>(resources);
+    missingResources.removeAll(currentBestAssignment.keySet());
+    currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources));
+
+    return currentBestAssignment;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
new file mode 100644
index 000000000..6130e5c52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -0,0 +1,216 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.changedetector.ResourceChangeDetector;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Global Rebalance does the baseline recalculation when certain changes happen.
+ * The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled.
+ * Baseline is used as an anchor for {@link PartialRebalanceRunner}. Its computation takes previous baseline as an input.
+ * The Baseline is NOT directly propagated to the final output. It is consumed by the {link PartialRebalanceRunner}
+ * as an important parameter.
+ */
+class GlobalRebalanceRunner implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(GlobalRebalanceRunner.class);
+
+  // When any of the following change happens, the rebalancer needs to do a global rebalance which
+  // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
+  private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
+      ImmutableSet
+          .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
+              HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+  // To calculate the baseline asynchronously
+  private final ExecutorService _baselineCalculateExecutor;
+  private final ResourceChangeDetector _changeDetector;
+  private final AssignmentManager _assignmentManager;
+  private final AssignmentMetadataStore _assignmentMetadataStore;
+  private final LatencyMetric _writeLatency;
+  private final CountMetric _baselineCalcCounter;
+  private final LatencyMetric _baselineCalcLatency;
+  private final CountMetric _rebalanceFailureCount;
+
+  private boolean _asyncGlobalRebalanceEnabled;
+
+  public GlobalRebalanceRunner(AssignmentManager assignmentManager,
+      AssignmentMetadataStore assignmentMetadataStore,
+      MetricCollector metricCollector,
+      LatencyMetric writeLatency,
+      CountMetric rebalanceFailureCount,
+      boolean isAsyncGlobalRebalanceEnabled) {
+    _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+    _assignmentManager = assignmentManager;
+    _assignmentMetadataStore = assignmentMetadataStore;
+    _changeDetector = new ResourceChangeDetector(true);
+    _writeLatency = writeLatency;
+    _baselineCalcCounter = metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+        CountMetric.class);
+    _baselineCalcLatency = metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(),
+        LatencyMetric.class);
+    _rebalanceFailureCount = rebalanceFailureCount;
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+  }
+
+  /**
+   * Global rebalance calculates for a new baseline assignment.
+   * The new baseline assignment will be persisted and leveraged by the partial rebalance.
+   * @param clusterData
+   * @param resourceMap
+   * @param currentStateOutput
+   * @param algorithm
+   * @throws HelixRebalanceException
+   */
+  public void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+    _changeDetector.updateSnapshots(clusterData);
+    // Get all the changed items' information. Filter for the items that have content changed.
+    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
+
+    if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
+      final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
+      // Calculate the Baseline assignment for global rebalance.
+      Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
+        try {
+          // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
+          // be triggered again after baseline is finished.
+          // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+          doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
+              clusterChanges);
+        } catch (HelixRebalanceException e) {
+          if (_asyncGlobalRebalanceEnabled) {
+            _rebalanceFailureCount.increment(1L);
+          }
+          LOG.error("Failed to calculate baseline assignment!", e);
+          return false;
+        }
+        return true;
+      });
+      if (waitForGlobalRebalance) {
+        try {
+          if (!result.get()) {
+            throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
+                HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Calculate and update the Baseline assignment
+   * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
+   *                                   so the new Baseline could be applied to cluster.
+   */
+  private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
+    LOG.info("Start calculating the new baseline.");
+    _baselineCalcCounter.increment(1L);
+    _baselineCalcLatency.startMeasuringLatency();
+
+    // Build the cluster model for rebalance calculation.
+    // Note, for a Baseline calculation,
+    // 1. Ignore node status (disable/offline).
+    // 2. Use the previous Baseline as the only parameter about the previous assignment.
+    Map<String, ResourceAssignment> currentBaseline =
+        _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+    ClusterModel clusterModel;
+    try {
+      clusterModel =
+          ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
+              clusterChanges, currentBaseline);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+
+    Map<String, ResourceAssignment> newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+    boolean isBaselineChanged =
+        _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
+    // Write the new baseline to metadata store
+    if (isBaselineChanged) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        _assignmentMetadataStore.persistBaseline(newBaseline);
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
+    }
+    _baselineCalcLatency.endMeasuringLatency();
+    LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
+
+    if (isBaselineChanged && shouldTriggerMainPipeline) {
+      LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+    }
+  }
+
+  public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+  }
+
+  public ResourceChangeDetector getChangeDetector() {
+    return _changeDetector;
+  }
+
+  public void resetChangeDetector() {
+    _changeDetector.resetSnapshots();
+  }
+
+  public void close() {
+    if (_baselineCalculateExecutor != null) {
+      _baselineCalculateExecutor.shutdownNow();
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
new file mode 100644
index 000000000..74982f6e3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Compute the best possible assignment based on the Baseline and the previous Best Possible assignment.
+ * The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a
+ * minimal rebalance scope. In short, the rebalance scope only contains the following two types of partitions.
+ * 1. The partition's current assignment becomes invalid.
+ * 2. The Baseline contains some new partition assignments that do not exist in the current assignment.
+ */
+class PartialRebalanceRunner implements AutoCloseable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PartialRebalanceRunner.class);
+
+  private final ExecutorService _bestPossibleCalculateExecutor;
+  private final AssignmentManager _assignmentManager;
+  private final AssignmentMetadataStore _assignmentMetadataStore;
+  private final BaselineDivergenceGauge _baselineDivergenceGauge;
+  private final CountMetric _rebalanceFailureCount;
+  private final CountMetric _partialRebalanceCounter;
+  private final LatencyMetric _partialRebalanceLatency;
+
+  private boolean _asyncPartialRebalanceEnabled;
+  private Future<Boolean> _asyncPartialRebalanceResult;
+
+  public PartialRebalanceRunner(AssignmentManager assignmentManager,
+      AssignmentMetadataStore assignmentMetadataStore,
+      MetricCollector metricCollector,
+      CountMetric rebalanceFailureCount,
+      boolean isAsyncPartialRebalanceEnabled) {
+    _assignmentManager = assignmentManager;
+    _assignmentMetadataStore = assignmentMetadataStore;
+    _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
+    _rebalanceFailureCount = rebalanceFailureCount;
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+
+    _partialRebalanceCounter = metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+        CountMetric.class);
+    _partialRebalanceLatency = metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
+            .name(),
+        LatencyMetric.class);
+    _baselineDivergenceGauge = metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+        BaselineDivergenceGauge.class);
+  }
+
+  public void partialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
+      throws HelixRebalanceException {
+    // If partial rebalance is async and the previous result is not completed yet,
+    // do not start another partial rebalance.
+    if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+        && !_asyncPartialRebalanceResult.isDone()) {
+      return;
+    }
+
+    _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
+      try {
+        doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+            currentStateOutput);
+      } catch (HelixRebalanceException e) {
+        if (_asyncPartialRebalanceEnabled) {
+          _rebalanceFailureCount.increment(1L);
+        }
+        LOG.error("Failed to calculate best possible assignment!", e);
+        return false;
+      }
+      return true;
+    });
+    if (!_asyncPartialRebalanceEnabled) {
+      try {
+        if (!_asyncPartialRebalanceResult.get()) {
+          throw new HelixRebalanceException("Failed to calculate for the new best possible.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new HelixRebalanceException("Failed to execute new best possible calculation.",
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+      }
+    }
+  }
+
+  /**
+   * Calculate and update the Best Possible assignment
+   * If the result differ from the persisted result, persist it to memory (only if the version is not stale);
+   * If persisted, trigger the pipeline so that main thread logic can run again.
+   */
+  private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
+    LOG.info("Start calculating the new best possible assignment.");
+    _partialRebalanceCounter.increment(1L);
+    _partialRebalanceLatency.startMeasuringLatency();
+
+    int newBestPossibleAssignmentVersion = -1;
+    if (_assignmentMetadataStore != null) {
+      newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
+    }
+
+    // Read the baseline from metadata store
+    Map<String, ResourceAssignment> currentBaseline =
+        _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
+    // Read the best possible assignment from metadata store
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+    ClusterModel clusterModel;
+    try {
+      clusterModel = ClusterModelProvider
+          .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
+              currentBaseline, currentBestPossibleAssignment);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+    Map<String, ResourceAssignment> newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+
+    // Asynchronously report baseline divergence metric before persisting to metadata store,
+    // just in case if persisting fails, we still have the metric.
+    // To avoid changes of the new assignment and make it safe when being used to measure baseline
+    // divergence, use a deep copy of the new assignment.
+    Map<String, ResourceAssignment> newAssignmentCopy = new HashMap<>();
+    for (Map.Entry<String, ResourceAssignment> entry : newAssignment.entrySet()) {
+      newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
+    }
+
+    _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
+        currentBaseline, newAssignmentCopy);
+
+    boolean bestPossibleUpdateSuccessful = false;
+    if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+      bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+          newBestPossibleAssignmentVersion);
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
+    }
+    _partialRebalanceLatency.endMeasuringLatency();
+    LOG.info("Finish calculating the new best possible assignment.");
+
+    if (bestPossibleUpdateSuccessful) {
+      LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+    }
+  }
+
+  public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+  }
+
+  public boolean isAsyncPartialRebalanceEnabled() {
+    return _asyncPartialRebalanceEnabled;
+  }
+
+  @Override
+  public void close() {
+    if (_bestPossibleCalculateExecutor != null) {
+      _bestPossibleCalculateExecutor.shutdownNow();
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 3fffef2fb..b43ce4b68 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -28,16 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -46,11 +40,11 @@ import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.StatefulRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
-import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -60,10 +54,8 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.MetricCollector;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
-import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
-import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,12 +69,6 @@ import org.slf4j.LoggerFactory;
 public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
 
-  // When any of the following change happens, the rebalancer needs to do a global rebalance which
-  // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
-  private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      ImmutableSet
-          .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
-              HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
   // To identify if the preference has been configured or not.
   private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
       NOT_CONFIGURED_PREFERENCE = ImmutableMap
@@ -97,37 +83,25 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       .unmodifiableList(Arrays.asList(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS,
           HelixRebalanceException.Type.UNKNOWN_FAILURE));
 
-  // To calculate the baseline asynchronously
-  private final ExecutorService _baselineCalculateExecutor;
-  private final ExecutorService _bestPossibleCalculateExecutor;
-  private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
   private final AssignmentMetadataStore _assignmentMetadataStore;
 
   private final MetricCollector _metricCollector;
   private final CountMetric _rebalanceFailureCount;
-  private final CountMetric _baselineCalcCounter;
-  private final LatencyMetric _baselineCalcLatency;
   private final LatencyMetric _writeLatency;
-  private final CountMetric _partialRebalanceCounter;
-  private final LatencyMetric _partialRebalanceLatency;
   private final CountMetric _emergencyRebalanceCounter;
   private final LatencyMetric _emergencyRebalanceLatency;
   private final CountMetric _rebalanceOverwriteCounter;
   private final LatencyMetric _rebalanceOverwriteLatency;
-  private final LatencyMetric _stateReadLatency;
-  private final BaselineDivergenceGauge _baselineDivergenceGauge;
-
-  private boolean _asyncGlobalRebalanceEnabled;
-  private boolean _asyncPartialRebalanceEnabled;
-  private Future<Boolean> _asyncPartialRebalanceResult;
+  private final AssignmentManager _assignmentManager;
+  private final PartialRebalanceRunner _partialRebalanceRunner;
+  private final GlobalRebalanceRunner _globalRebalanceRunner;
 
   // Note, the rebalance algorithm field is mutable so it should not be directly referred except for
   // the public method computeNewIdealStates.
   private RebalanceAlgorithm _rebalanceAlgorithm;
-  private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
-      NOT_CONFIGURED_PREFERENCE;
+  private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference = NOT_CONFIGURED_PREFERENCE;
 
   private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
       String clusterName) {
@@ -199,20 +173,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _rebalanceFailureCount = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
         CountMetric.class);
-    _baselineCalcCounter = _metricCollector.getMetric(
-        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
-        CountMetric.class);
-    _baselineCalcLatency = _metricCollector.getMetric(
-        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
-            .name(),
-        LatencyMetric.class);
-    _partialRebalanceCounter = _metricCollector.getMetric(
-        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
-        CountMetric.class);
-    _partialRebalanceLatency = _metricCollector.getMetric(
-        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
-            .name(),
-        LatencyMetric.class);
     _emergencyRebalanceCounter = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class);
     _emergencyRebalanceLatency = _metricCollector.getMetric(
@@ -226,29 +186,24 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _writeLatency = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
         LatencyMetric.class);
-    _stateReadLatency = _metricCollector.getMetric(
+    _assignmentManager = new AssignmentManager(_metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
-        LatencyMetric.class);
-    _baselineDivergenceGauge = _metricCollector.getMetric(
-        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
-        BaselineDivergenceGauge.class);
+        LatencyMetric.class));
 
-    _changeDetector = new ResourceChangeDetector(true);
-
-    _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
-    _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
-    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
-    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+    _partialRebalanceRunner = new PartialRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
+        _rebalanceFailureCount, isAsyncPartialRebalanceEnabled);
+    _globalRebalanceRunner = new GlobalRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
+        _writeLatency, _rebalanceFailureCount, isAsyncGlobalRebalanceEnabled);
   }
 
   // Update the global rebalance mode to be asynchronous or synchronous
   public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
-    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+    _globalRebalanceRunner.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
   }
 
   // Update the partial rebalance mode to be asynchronous or synchronous
   public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
-    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+    _partialRebalanceRunner.setPartialRebalanceAsyncMode(isAsyncPartialRebalanceEnabled);
   }
 
   // Update the rebalancer preference if the new options are different from the current preference.
@@ -267,18 +222,14 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.reset();
     }
-    _changeDetector.resetSnapshots();
+    _globalRebalanceRunner.resetChangeDetector();
   }
 
   // TODO the rebalancer should reject any other computing request after being closed.
   @Override
   public void close() {
-    if (_baselineCalculateExecutor != null) {
-      _baselineCalculateExecutor.shutdownNow();
-    }
-    if (_bestPossibleCalculateExecutor != null) {
-      _bestPossibleCalculateExecutor.shutdownNow();
-    }
+    _partialRebalanceRunner.close();
+    _globalRebalanceRunner.close();
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
@@ -315,7 +266,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         // Note that don't return an assignment based on the current state if there is no previously
         // calculated result in this fallback logic.
         Map<String, ResourceAssignment> assignmentRecord =
-            getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
+            _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
                 resourceMap.keySet());
         newIdealStates = convertResourceAssignment(clusterData, assignmentRecord);
       }
@@ -368,7 +319,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
         newBestPossibleAssignment)) {
       applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
+          _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final mapping after calculation.
@@ -388,7 +339,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
     // Perform global rebalance for a new baseline assignment
-    globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
+    _globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
     // Perform emergency rebalance for a new best possible assignment
     Map<String, ResourceAssignment> newAssignment =
         emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
@@ -434,209 +385,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     return FAILURE_TYPES_TO_PROPAGATE;
   }
 
-  /**
-   * Global rebalance calculates for a new baseline assignment.
-   * The new baseline assignment will be persisted and leveraged by the partial rebalance.
-   * @param clusterData
-   * @param resourceMap
-   * @param currentStateOutput
-   * @param algorithm
-   * @throws HelixRebalanceException
-   */
-  private void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
-    _changeDetector.updateSnapshots(clusterData);
-    // Get all the changed items' information. Filter for the items that have content changed.
-    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
-
-    if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
-      final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
-      // Calculate the Baseline assignment for global rebalance.
-      Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
-        try {
-          // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
-          // be triggered again after baseline is finished.
-          // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
-          doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
-              clusterChanges);
-        } catch (HelixRebalanceException e) {
-          if (_asyncGlobalRebalanceEnabled) {
-            _rebalanceFailureCount.increment(1L);
-          }
-          LOG.error("Failed to calculate baseline assignment!", e);
-          return false;
-        }
-        return true;
-      });
-      if (waitForGlobalRebalance) {
-        try {
-          if (!result.get()) {
-            throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
-                HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-          }
-        } catch (InterruptedException | ExecutionException e) {
-          throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
-              HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Calculate and update the Baseline assignment
-   * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
-   *                                   so the new Baseline could be applied to cluster.
-   */
-  private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
-    LOG.info("Start calculating the new baseline.");
-    _baselineCalcCounter.increment(1L);
-    _baselineCalcLatency.startMeasuringLatency();
-
-    // Build the cluster model for rebalance calculation.
-    // Note, for a Baseline calculation,
-    // 1. Ignore node status (disable/offline).
-    // 2. Use the previous Baseline as the only parameter about the previous assignment.
-    Map<String, ResourceAssignment> currentBaseline =
-        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-    ClusterModel clusterModel;
-    try {
-      clusterModel =
-          ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
-              clusterChanges, currentBaseline);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
-          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-    }
-
-    Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
-    boolean isBaselineChanged =
-        _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
-    // Write the new baseline to metadata store
-    if (isBaselineChanged) {
-      try {
-        _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.persistBaseline(newBaseline);
-        _writeLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
-            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-      }
-    } else {
-      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
-    }
-    _baselineCalcLatency.endMeasuringLatency();
-    LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
-
-    if (isBaselineChanged && shouldTriggerMainPipeline) {
-      LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
-    }
-  }
-
-  private void partialRebalance(
-      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
-      RebalanceAlgorithm algorithm)
-      throws HelixRebalanceException {
-    // If partial rebalance is async and the previous result is not completed yet,
-    // do not start another partial rebalance.
-    if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
-        && !_asyncPartialRebalanceResult.isDone()) {
-      return;
-    }
-
-    _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
-      try {
-        doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
-            currentStateOutput);
-      } catch (HelixRebalanceException e) {
-        if (_asyncPartialRebalanceEnabled) {
-          _rebalanceFailureCount.increment(1L);
-        }
-        LOG.error("Failed to calculate best possible assignment!", e);
-        return false;
-      }
-      return true;
-    });
-    if (!_asyncPartialRebalanceEnabled) {
-      try {
-        if (!_asyncPartialRebalanceResult.get()) {
-          throw new HelixRebalanceException("Failed to calculate for the new best possible.",
-              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throw new HelixRebalanceException("Failed to execute new best possible calculation.",
-            HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
-      }
-    }
-  }
-
-  /**
-   * Calculate and update the Best Possible assignment
-   */
-  private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
-      throws HelixRebalanceException {
-    LOG.info("Start calculating the new best possible assignment.");
-    _partialRebalanceCounter.increment(1L);
-    _partialRebalanceLatency.startMeasuringLatency();
-
-    int newBestPossibleAssignmentVersion = -1;
-    if (_assignmentMetadataStore != null) {
-      newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
-    } else {
-      LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
-    }
-
-    // Read the baseline from metadata store
-    Map<String, ResourceAssignment> currentBaseline =
-        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-
-    // Read the best possible assignment from metadata store
-    Map<String, ResourceAssignment> currentBestPossibleAssignment =
-        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
-            resourceMap.keySet());
-    ClusterModel clusterModel;
-    try {
-      clusterModel = ClusterModelProvider
-          .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
-              currentBaseline, currentBestPossibleAssignment);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
-          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-    }
-    Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterModel, algorithm);
-
-    // Asynchronously report baseline divergence metric before persisting to metadata store,
-    // just in case if persisting fails, we still have the metric.
-    // To avoid changes of the new assignment and make it safe when being used to measure baseline
-    // divergence, use a deep copy of the new assignment.
-    Map<String, ResourceAssignment> newAssignmentCopy = new HashMap<>();
-    for (Map.Entry<String, ResourceAssignment> entry : newAssignment.entrySet()) {
-      newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
-    }
-
-    _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
-        currentBaseline, newAssignmentCopy);
-
-    boolean bestPossibleUpdateSuccessful = false;
-    if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
-      bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
-          newBestPossibleAssignmentVersion);
-    } else {
-      LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
-    }
-    _partialRebalanceLatency.endMeasuringLatency();
-    LOG.info("Finish calculating the new best possible assignment.");
-
-    if (bestPossibleUpdateSuccessful) {
-      LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
-    }
-  }
-
   protected Map<String, ResourceAssignment> emergencyRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
@@ -647,7 +395,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _emergencyRebalanceLatency.startMeasuringLatency();
 
     Map<String, ResourceAssignment> currentBestPossibleAssignment =
-        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+        _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
             resourceMap.keySet());
 
     // Step 1: Check for permanent node down
@@ -677,7 +425,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.",
             HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
       }
-      newAssignment = calculateAssignment(clusterModel, algorithm);
+      newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
     } else {
       newAssignment = currentBestPossibleAssignment;
     }
@@ -687,9 +435,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _emergencyRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish emergency rebalance");
 
-    partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
-    if (!_asyncPartialRebalanceEnabled) {
-      newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+    _partialRebalanceRunner.partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+    if (!_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+      newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
           resourceMap.keySet());
       persistBestPossibleAssignment(newAssignment);
     }
@@ -697,24 +445,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     return newAssignment;
   }
 
-  /**
-   * @param clusterModel the cluster model that contains all the cluster status for the purpose of
-   *                     rebalancing.
-   * @return the new optimal assignment for the resources.
-   */
-  private Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
-      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
-    long startTime = System.currentTimeMillis();
-    LOG.info("Start calculating for an assignment with algorithm {}",
-        algorithm.getClass().getSimpleName());
-    OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
-    Map<String, ResourceAssignment> newAssignment =
-        optimalAssignment.getOptimalResourceAssignment();
-    LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
-        algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
-    return newAssignment;
-  }
-
   // Generate the preference lists from the state mapping based on state priority.
   private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
       Map<String, Integer> statePriorityMap) {
@@ -751,39 +481,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  /**
-   * @param assignmentMetadataStore
-   * @param currentStateOutput
-   * @param resources
-   * @return The current baseline assignment. If record does not exist in the
-   *         assignmentMetadataStore, return the current state assignment.
-   * @throws HelixRebalanceException
-   */
-  private Map<String, ResourceAssignment> getBaselineAssignment(
-      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
-      Set<String> resources) throws HelixRebalanceException {
-    Map<String, ResourceAssignment> currentBaseline = new HashMap<>();
-    if (assignmentMetadataStore != null) {
-      try {
-        _stateReadLatency.startMeasuringLatency();
-        currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
-        _stateReadLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        throw new HelixRebalanceException(
-            "Failed to get the current baseline assignment because of unexpected error.",
-            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-      }
-    }
-    currentBaseline.keySet().retainAll(resources);
-
-    // For resources without baseline, fall back to current state  assignments
-    Set<String> missingResources = new HashSet<>(resources);
-    missingResources.removeAll(currentBaseline.keySet());
-    currentBaseline.putAll(currentStateOutput.getAssignment(missingResources));
-
-    return currentBaseline;
-  }
-
   /**
    * @param assignmentMetadataStore
    * @param currentStateOutput
@@ -795,26 +492,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
   protected Map<String, ResourceAssignment> getBestPossibleAssignment(
       AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
       Set<String> resources) throws HelixRebalanceException {
-    Map<String, ResourceAssignment> currentBestAssignment = new HashMap<>();
-    if (assignmentMetadataStore != null) {
-      try {
-        _stateReadLatency.startMeasuringLatency();
-        currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
-        _stateReadLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        throw new HelixRebalanceException(
-            "Failed to get the current best possible assignment because of unexpected error.",
-            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-      }
-    }
-    currentBestAssignment.keySet().retainAll(resources);
-
-    // For resources without best possible states, fall back to current state  assignments
-    Set<String> missingResources = new HashSet<>(resources);
-    missingResources.removeAll(currentBestAssignment.keySet());
-    currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources));
-
-    return currentBestAssignment;
+    return _assignmentManager.getBestPossibleAssignment(assignmentMetadataStore, currentStateOutput, resources);
   }
 
   private void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment)
@@ -917,7 +595,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
           HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
     }
     Map<String, IdealState> activeIdealStates =
-        convertResourceAssignment(clusterData, calculateAssignment(clusterModel, algorithm));
+        convertResourceAssignment(clusterData, WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm));
     for (String resourceName : idealStateMap.keySet()) {
       // The new calculated ideal state before overwrite
       IdealState newIdealState = idealStateMap.get(resourceName);
@@ -968,7 +646,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
   }
 
   protected ResourceChangeDetector getChangeDetector() {
-    return _changeDetector;
+    return _globalRebalanceRunner.getChangeDetector();
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index d011c5c2e..3344fe14c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,12 +20,9 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
-import java.sql.Array;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -35,7 +32,6 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
@@ -55,7 +51,6 @@ import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;