You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/12/02 00:33:40 UTC

[helix] branch nealsun/waged-pipeline-redesign updated: WAGED pipeline redesign

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch nealsun/waged-pipeline-redesign
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/nealsun/waged-pipeline-redesign by this push:
     new 0936a2af2 WAGED pipeline redesign
0936a2af2 is described below

commit 0936a2af2a625cccd469a468ed2fc53c02f1b802
Author: Neal Sun <ne...@linkedin.com>
AuthorDate: Thu Dec 1 16:33:34 2022 -0800

    WAGED pipeline redesign
    
    Added new WAGED pipeline (emergency) and altered behaviors for performance.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 116 +++++----
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  27 +-
 .../rebalancer/waged/WagedRebalancer.java          | 283 ++++++++++++++++-----
 .../waged/model/ClusterModelProvider.java          |  70 ++++-
 .../java/org/apache/helix/model/ClusterConfig.java |   1 +
 .../waged/MockAssignmentMetadataStore.java         |  19 +-
 .../waged/TestAssignmentMetadataStore.java         |  24 --
 .../rebalancer/waged/TestWagedRebalancer.java      |  14 +-
 8 files changed, 389 insertions(+), 165 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index dd502893e..bc4120cb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -50,6 +50,7 @@ public class AssignmentMetadataStore {
   private String _bestPossiblePath;
   protected Map<String, ResourceAssignment> _globalBaseline;
   protected Map<String, ResourceAssignment> _bestPossibleAssignment;
+  protected int _bestPossibleVersion = 0;
 
   AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
     this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -92,60 +93,78 @@ public class AssignmentMetadataStore {
   }
 
   /**
-   * @return true if a new baseline was persisted.
+   * @param newAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
    * @throws HelixException if the method failed to persist the baseline.
    */
-  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, BASELINE_KEY);
+  private void persistAssignmentToMetadataStore(Map<String, ResourceAssignment> newAssignment, String path, String key)
+      throws HelixException {
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to persist %s assignment to path %s", key, path), e);
+    }
   }
 
   /**
-   * @return true if a new best possible assignment was persisted.
-   * @throws HelixException if the method failed to persist the baseline.
+   * Persist a new baseline assignment to metadata store first, then to memory
+   * @param globalBaseline
    */
-  public synchronized boolean persistBestPossibleAssignment(
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
-    return persistAssignment(bestPossibleAssignment, getBestPossibleAssignment(), _bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(globalBaseline, _baselinePath, BASELINE_KEY);
+    // write to memory
+    getBaseline().clear();
+    getBaseline().putAll(globalBaseline);
   }
 
-  public synchronized void clearAssignmentMetadata() {
-    persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, BASELINE_KEY);
-    persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), _bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  /**
+   * Persist a new best possible assignment to metadata store first, then to memory.
+   * Increment best possible version by 1 - this is a high priority in-memory write.
+   * @param bestPossibleAssignment
+   */
+  public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(bestPossibleAssignment, _bestPossiblePath, BEST_POSSIBLE_KEY);
+    // write to memory
+    getBestPossibleAssignment().clear();
+    getBestPossibleAssignment().putAll(bestPossibleAssignment);
+    _bestPossibleVersion++;
   }
 
   /**
-   * @param newAssignment
-   * @param cachedAssignment
-   * @param path the path of the assignment record
-   * @param key  the key of the assignment in the record
-   * @return true if a new assignment was persisted.
+   * Attempts to persist Best Possible Assignment in memory from an asynchronous thread.
+   * Persist only happens when the provided version is not stale - this is a low priority in-memory write.
+   * @param bestPossibleAssignment - new assignment to be persisted
+   * @param newVersion - attempted new version to write. This version is obtained earlier from getBestPossibleVersion()
+   * @return true if the attempt succeeded, false otherwise.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
-  // TODO: when it is skipped.
-  private boolean persistAssignment(Map<String, ResourceAssignment> newAssignment,
-      Map<String, ResourceAssignment> cachedAssignment, String path,
-      String key) {
-    // TODO: Make the write async?
-    // If the assignment hasn't changed, skip writing to metadata store
-    if (compareAssignments(cachedAssignment, newAssignment)) {
-      return false;
-    }
-    // Persist to ZK
-    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
-    try {
-      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
-    } catch (IOException e) {
-      // TODO: Improve failure handling
-      throw new HelixException(
-          String.format("Failed to persist %s assignment to path %s", key, path), e);
+  public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+      Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+    // Check if the version is stale by this point
+    if (newVersion > _bestPossibleVersion) {
+      getBestPossibleAssignment().clear();
+      getBestPossibleAssignment().putAll(bestPossibleAssignment);
+      _bestPossibleVersion = newVersion;
+      return true;
     }
 
-    // Update the in-memory reference
-    cachedAssignment.clear();
-    cachedAssignment.putAll(newAssignment);
-    return true;
+    return false;
+  }
+
+  public int getBestPossibleVersion() {
+    return _bestPossibleVersion;
+  }
+
+  public synchronized void clearAssignmentMetadata() {
+    persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, BASELINE_KEY);
+    persistAssignmentToMetadataStore(Collections.emptyMap(), _bestPossiblePath, BEST_POSSIBLE_KEY);
+    getBaseline().clear();
+    getBestPossibleAssignment().clear();
   }
 
   protected synchronized void reset() {
@@ -200,16 +219,11 @@ public class AssignmentMetadataStore {
     return assignmentMap;
   }
 
-  /**
-   * Returns whether two assignments are same.
-   * @param oldAssignment
-   * @param newAssignment
-   * @return true if they are the same. False otherwise or oldAssignment is null
-   */
-  protected boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
-      Map<String, ResourceAssignment> newAssignment) {
-    // If oldAssignment is null, that means that we haven't read from/written to
-    // the metadata store yet. In that case, we return false so that we write to metadata store.
-    return oldAssignment != null && oldAssignment.equals(newAssignment);
+  protected boolean isBaselineChanged(Map<String, ResourceAssignment> newBaseline) {
+    return !getBaseline().equals(newBaseline);
+  }
+
+  protected boolean isBestPossibleChanged(Map<String, ResourceAssignment> newBestPossible) {
+    return !getBestPossibleAssignment().equals(newBestPossible);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e99..711871381 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -67,26 +67,29 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
     }
 
     @Override
-    public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-      // If baseline hasn't changed, skip updating the metadata store
-      if (compareAssignments(_globalBaseline, globalBaseline)) {
-        return false;
-      }
+    public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
       // Update the in-memory reference only
       _globalBaseline = globalBaseline;
-      return true;
     }
 
     @Override
-    public boolean persistBestPossibleAssignment(
+    public void persistBestPossibleAssignment(
         Map<String, ResourceAssignment> bestPossibleAssignment) {
-      // If bestPossibleAssignment hasn't changed, skip updating the metadata store
-      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
-        return false;
-      }
       // Update the in-memory reference only
       _bestPossibleAssignment = bestPossibleAssignment;
-      return true;
+      _bestPossibleVersion++;
+    }
+    @Override
+    public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+        Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+      // Check if the version is stale by this point
+      if (newVersion > _bestPossibleVersion) {
+        _bestPossibleAssignment = bestPossibleAssignment;
+        _bestPossibleVersion = newVersion;
+        return true;
+      }
+
+      return false;
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a99ad26ab..e7c09ab65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
@@ -98,6 +99,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
 
   // To calculate the baseline asynchronously
   private final ExecutorService _baselineCalculateExecutor;
+  private final ExecutorService _bestPossibleCalculateExecutor;
   private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
@@ -114,6 +116,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
   private boolean _asyncGlobalRebalanceEnabled;
+  private boolean _asyncPartialRebalanceEnabled;
+  private Future<Boolean> _asyncPartialRebalanceResult;
 
   // Note, the rebalance algorithm field is mutable so it should not be directly referred except for
   // the public method computeNewIdealStates.
@@ -149,7 +153,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         // cluster has converged.
         helixManager == null ? new WagedRebalancerMetricCollector()
             : new WagedRebalancerMetricCollector(helixManager.getClusterName()),
-        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED,
+        ClusterConfig.DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED);
     _preference = ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
   }
 
@@ -166,12 +171,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         // If metricCollector is not provided, instantiate a version that does not register metrics
         // in order to allow rebalancer to proceed
         metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
-        false);
+        false, false);
   }
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
-      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
+      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled,
+      boolean isAsyncPartialRebalanceEnabled) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the rebalance.");
@@ -216,7 +222,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _changeDetector = new ResourceChangeDetector(true);
 
     _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+    _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
     _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
   }
 
   // Update the global rebalance mode to be asynchronous or synchronous
@@ -224,6 +232,11 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
   }
 
+  // Update the partial rebalance mode to be asynchronous or synchronous
+  public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+  }
+
   // Update the rebalancer preference if the new options are different from the current preference.
   public synchronized void updateRebalancePreference(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
@@ -249,6 +262,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     if (_baselineCalculateExecutor != null) {
       _baselineCalculateExecutor.shutdownNow();
     }
+    if (_bestPossibleCalculateExecutor != null) {
+      _bestPossibleCalculateExecutor.shutdownNow();
+    }
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
@@ -329,22 +345,24 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
     delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
 
-    Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
-        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput,
-            algorithm));
+    Map<String, ResourceAssignment> newBestPossibleAssignment =
+        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+    Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData, newBestPossibleAssignment);
 
     // The additional rebalance overwrite is required since the calculated mapping may contain
     // some delayed rebalanced assignments.
-    if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+    if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
+        newBestPossibleAssignment)) {
       applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()),
-          algorithm);
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final mapping after calculation.
     // This is to avoid persisting it into the assignment store, which impacts the long term
     // assignment evenness and partition movements.
-    newIdealStates.forEach((key, value) -> applyUserDefinedPreferenceList(clusterData.getResourceConfig(key), value));
+    newIdealStates.forEach(
+        (resourceName, idealState) -> applyUserDefinedPreferenceList(clusterData.getResourceConfig(resourceName),
+            idealState));
 
     return newIdealStates;
   }
@@ -357,9 +375,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       throws HelixRebalanceException {
     // Perform global rebalance for a new baseline assignment
     globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
-    // Perform partial rebalance for a new best possible assignment
+    // Perform emergency rebalance for a new best possible assignment
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+        emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+
     return newAssignment;
   }
 
@@ -410,43 +429,22 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
    * @param algorithm
    * @throws HelixRebalanceException
    */
-  private void globalRebalance(ResourceControllerDataProvider clusterData,
-      Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
-      RebalanceAlgorithm algorithm)
-      throws HelixRebalanceException {
+  private void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
     _changeDetector.updateSnapshots(clusterData);
     // Get all the changed items' information. Filter for the items that have content changed.
-    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
-        _changeDetector.getAllChanges();
-
-    if (clusterChanges.keySet().stream()
-        .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
-      // Build the cluster model for rebalance calculation.
-      // Note, for a Baseline calculation,
-      // 1. Ignore node status (disable/offline).
-      // 2. Use the previous Baseline as the only parameter about the previous assignment.
-      Map<String, ResourceAssignment> currentBaseline =
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-      ClusterModel clusterModel;
-      try {
-        clusterModel = ClusterModelProvider
-            .generateClusterModelForBaseline(clusterData, resourceMap,
-                clusterData.getAllInstances(), clusterChanges, currentBaseline);
-      } catch (Exception ex) {
-        throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
-            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-      }
+    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
 
+    if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
       final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
-      final String clusterName = clusterData.getClusterName();
       // Calculate the Baseline assignment for global rebalance.
       Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
         try {
-          // Note that we should schedule a new partial rebalance for a future rebalance pipeline if
-          // the planned partial rebalance in the current rebalance pipeline won't wait for the new
-          // baseline being calculated.
-          // So set shouldSchedulePartialRebalance to be !waitForGlobalRebalance
-          calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
+          // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
+          // be triggered again after baseline is finished.
+          // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+          doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
+              clusterChanges);
         } catch (HelixRebalanceException e) {
           LOG.error("Failed to calculate baseline assignment!", e);
           return false;
@@ -469,27 +467,40 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
 
   /**
    * Calculate and update the Baseline assignment
-   * @param clusterModel
-   * @param algorithm
-   * @param shouldSchedulePartialRebalance True if the call should trigger a following partial rebalance
+   * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
    *                                   so the new Baseline could be applied to cluster.
-   * @param clusterName
-   * @throws HelixRebalanceException
    */
-  private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
-      boolean shouldSchedulePartialRebalance, String clusterName)
-      throws HelixRebalanceException {
+  private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
     _baselineCalcCounter.increment(1L);
     _baselineCalcLatency.startMeasuringLatency();
 
-    boolean isBaselineChanged = false;
+    // Build the cluster model for rebalance calculation.
+    // Note, for a Baseline calculation,
+    // 1. Ignore node status (disable/offline).
+    // 2. Use the previous Baseline as the only parameter about the previous assignment.
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+    ClusterModel clusterModel;
+    try {
+      clusterModel =
+          ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
+              clusterChanges, currentBaseline);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+
     Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
+    boolean isBaselineChanged =
+        _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
     // Write the new baseline to metadata store
-    if (_assignmentMetadataStore != null) {
+    if (isBaselineChanged) {
       try {
         _writeLatency.startMeasuringLatency();
-        isBaselineChanged = _assignmentMetadataStore.persistBaseline(newBaseline);
+        _assignmentMetadataStore.persistBaseline(newBaseline);
         _writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
@@ -501,21 +512,64 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _baselineCalcLatency.endMeasuringLatency();
     LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
 
-    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+    if (isBaselineChanged && shouldTriggerMainPipeline) {
       LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
     }
   }
 
-  private Map<String, ResourceAssignment> partialRebalance(
+  private void partialRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    // If partial rebalance is async and the previous result is not completed yet,
+    // do not start another partial rebalance.
+    if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+        && !_asyncPartialRebalanceResult.isDone()) {
+      return;
+    }
+
+    _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
+      try {
+        doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+            currentStateOutput);
+      } catch (HelixRebalanceException e) {
+        LOG.error("Failed to calculate best possible assignment!", e);
+        return false;
+      }
+      return true;
+    });
+    if (!_asyncPartialRebalanceEnabled) {
+      try {
+        if (!_asyncPartialRebalanceResult.get()) {
+          throw new HelixRebalanceException("Failed to calculate for the new best possible.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new HelixRebalanceException("Failed to execute new best possible calculation.",
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+      }
+    }
+  }
+
+  /**
+   * Calculate and update the Best Possible assignment
+   */
+  private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
     _partialRebalanceCounter.increment(1L);
     _partialRebalanceLatency.startMeasuringLatency();
-    // TODO: Consider combining the metrics for both baseline/best possible?
+
+    int newBestPossibleAssignmentVersion = -1;
+    if (_assignmentMetadataStore != null) {
+      newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
+    }
+
     // Read the baseline from metadata store
     Map<String, ResourceAssignment> currentBaseline =
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
@@ -547,20 +601,73 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
         currentBaseline, newAssignmentCopy);
 
-    if (_assignmentMetadataStore != null) {
-      try {
-        _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
-        _writeLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
-            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-      }
+    boolean bestPossibleUpdateSuccessful = false;
+    if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+      bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+          newBestPossibleAssignmentVersion);
     } else {
       LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
     }
     _partialRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish calculating the new best possible assignment.");
+
+    if (bestPossibleUpdateSuccessful) {
+      LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+    }
+  }
+
+  private Map<String, ResourceAssignment> emergencyRebalance(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+      RebalanceAlgorithm algorithm)
+      throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+
+    // Step 1: Check for permanent node down
+    AtomicBoolean allNodesActive = new AtomicBoolean(true);
+    currentBestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
+      resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+        for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
+          if (!activeNodes.contains(instance)) {
+            allNodesActive.set(false);
+            break;
+          }
+        }
+      });
+    }));
+
+
+    // Step 2: if there are permanent node downs, calculate for a new one best possible
+    Map<String, ResourceAssignment> newAssignment;
+    if (!allNodesActive.get()) {
+      ClusterModel clusterModel;
+      try {
+        clusterModel =
+            ClusterModelProvider.generateClusterModelForEmergencyRebalance(clusterData, resourceMap, activeNodes,
+                currentBestPossibleAssignment);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.",
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+      }
+      newAssignment = calculateAssignment(clusterModel, algorithm);
+    } else {
+      newAssignment = currentBestPossibleAssignment;
+    }
+
+    // Step 3: persist result to metadata store
+    persistBestPossibleAssignment(newAssignment);
+    LOG.info("Finish emergency rebalance");
+
+    partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+    if (!_asyncPartialRebalanceEnabled) {
+      newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+          resourceMap.keySet());
+      persistBestPossibleAssignment(newAssignment);
+    }
+
     return newAssignment;
   }
 
@@ -684,6 +791,22 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     return currentBestAssignment;
   }
 
+  private void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment)
+      throws HelixRebalanceException {
+    if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        _assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the best possible assignment.");
+    }
+  }
+
   /**
    * Schedule rebalance according to the delayed rebalance logic.
    * @param clusterData the current cluster data cache
@@ -710,6 +833,34 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
+  private boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
+    bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
+      String resourceName = resourceAssignment.getResourceName();
+      IdealState currentIdealState = clusterData.getIdealState(resourceName);
+      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+      int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+      int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+          .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+              currentIdealState), currentIdealState, numReplica);
+      resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+        int enabledLivePlacementCounter = 0;
+        for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
+          if (enabledLiveInstances.contains(instance)) {
+            enabledLivePlacementCounter++;
+          }
+        }
+
+        if (enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica)) {
+          allMinActiveReplicaMet.set(false);
+        }
+      });
+    }));
+
+    return !allMinActiveReplicaMet.get();
+  }
+
   /**
    * Update the rebalanced ideal states according to the real active nodes.
    * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 0fc1f2b05..ce82c3207 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -51,7 +51,31 @@ public class ClusterModelProvider {
     PARTIAL,
     // Set the rebalance scope to cover all replicas that need relocation based on the cluster
     // changes.
-    GLOBAL_BASELINE
+    GLOBAL_BASELINE,
+    // Set the rebalance scope to cover only replicas that are assigned to downed instances.
+    EMERGENCY
+  }
+
+  /**
+   * Generate a new Cluster Model object according to the current cluster status for emergency
+   * rebalance. The rebalance scope is configured for recovering replicas that are on permanently
+   * downed nodes
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in the calculation.
+   *                               Note this list can be different from the real active node list
+   *                               according to the rebalancer logic.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
+   *                               previous rebalance.
+   * @return the new cluster model
+   */
+  public static ClusterModel generateClusterModelForEmergencyRebalance(ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, activeInstances, Collections.emptyMap(),
+        Collections.emptyMap(), bestPossibleAssignment, RebalanceScopeType.EMERGENCY);
   }
 
   /**
@@ -165,6 +189,10 @@ public class ClusterModelProvider {
             findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeInstances,
                 idealAssignment, currentAssignment, allocatedReplicas);
         break;
+      case EMERGENCY:
+        toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
+            currentAssignment, allocatedReplicas);
+        break;
       default:
         throw new HelixException("Unknown rebalance scope type: " + scopeType);
     }
@@ -391,6 +419,46 @@ public class ClusterModelProvider {
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Find replicas that were assigned to non-active nodes in the current assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by resource name.
+   * @param activeInstances        All the instances that are live and enabled according to the delay rebalance configuration.
+   * @param currentAssignment      The current assignment that was generated in the previous rebalance.
+   * @param allocatedReplicas      A map of <Instance -> replicas> to return the allocated replicas grouped by the target instance name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> findToBeAssignedReplicasOnDownInstances(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> activeInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    // For any replica that are assigned to non-active instances (down instances), add them.
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    for (String resourceName : replicaMap.keySet()) {
+      Map<String, Map<String, Set<String>>> stateInstanceMap = getStateInstanceMap(currentAssignment.get(resourceName));
+
+      for (AssignableReplica replica : replicaMap.get(resourceName)) {
+        String partitionName = replica.getPartitionName();
+        String replicaState = replica.getReplicaState();
+        Set<String> currentAllocations =
+            stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+        if (!currentAllocations.isEmpty()) {
+          String allocatedInstance = currentAllocations.iterator().next();
+          if (activeInstances.contains(allocatedInstance)) {
+            allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new HashSet<>()).add(replica);
+          }
+          else {
+            toBeAssignedReplicas.add(replica);
+          }
+          currentAllocations.remove(allocatedInstance);
+        }
+      }
+    }
+
+    return toBeAssignedReplicas;
+  }
+
   /**
    * Filter to remove all invalid allocations that are not on the active instances.
    * @param assignment
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 829724142..c4f9f914f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -184,6 +184,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MAX_REBALANCE_PREFERENCE = 1000;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public final static boolean DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED = true;
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1;
   private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index a2220a5d4..fb59b8146 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -39,19 +39,30 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
     return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
   }
 
-  public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     _globalBaseline = globalBaseline;
-    return true;
   }
 
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment;
   }
 
-  public boolean persistBestPossibleAssignment(
+  public void persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     _bestPossibleAssignment = bestPossibleAssignment;
-    return true;
+    _bestPossibleVersion++;
+  }
+
+  public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+      Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+    // Check if the version is stale by this point
+    if (newVersion > _bestPossibleVersion) {
+      _bestPossibleAssignment = bestPossibleAssignment;
+      _bestPossibleVersion = newVersion;
+      return true;
+    }
+
+    return false;
   }
 
   public void close() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 84bc82957..7c5740423 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -96,31 +96,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
   }
 
-  /**
-   * Test that if the old assignment and new assignment are the same,
-   */
   @Test(dependsOnMethods = "testReadEmptyBaseline")
-  public void testAvoidingRedundantWrite() {
-    Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
-
-    // Call persist functions
-    _store.persistBaseline(dummyAssignment);
-    _store.persistBestPossibleAssignment(dummyAssignment);
-
-    // Check that only one version exists
-    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
-    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
-
-    // Call persist functions again
-    _store.persistBaseline(dummyAssignment);
-    _store.persistBestPossibleAssignment(dummyAssignment);
-
-    // Check that only one version exists still
-    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
-    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
-  }
-
-  @Test(dependsOnMethods = "testAvoidingRedundantWrite")
   public void testAssignmentCache() {
     Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
     // Call persist functions
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index cfcbe298e..5d4047d5b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -255,8 +255,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Map<String, ResourceAssignment> testResourceAssignmentMap = new HashMap<>();
     ZNRecord mappingNode = new ZNRecord(_resourceNames.get(0));
     HashMap<String, String> mapping = new HashMap<>();
-    mapping.put(_partitionNames.get(0), "MASTER");
-    mappingNode.setMapField(_testInstanceId, mapping);
+    mapping.put(_testInstanceId, "MASTER");
+    mappingNode.setMapField(_partitionNames.get(0), mapping);
     testResourceAssignmentMap.put(_resourceNames.get(0), new ResourceAssignment(mappingNode));
 
     _metadataStore.reset();
@@ -373,9 +373,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
           clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
-      Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+      Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to generate cluster model for partial rebalance. Failure Type: INVALID_CLUSTER_STATUS");
+          "Failed to calculate for the new best possible. Failure Type: FAILED_TO_CALCULATE");
     }
 
     // The rebalance will be done with empty mapping result since there is no previously calculated
@@ -389,7 +389,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   public void testInvalidRebalancerStatus() throws IOException {
     // Mock a metadata store that will fail on all the calls.
     AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
-    when(metadataStore.getBaseline())
+    when(metadataStore.getBestPossibleAssignment())
         .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
     WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm, Optional.empty());
 
@@ -404,7 +404,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.assertEquals(ex.getFailureType(),
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to get the current baseline assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
+          "Failed to get the current best possible assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
     }
   }
 
@@ -439,7 +439,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-      Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE");
+      Assert.assertEquals(ex.getMessage(), "Failed to calculate for the new best possible. Failure Type: FAILED_TO_CALCULATE");
     }
     // But if call with the public method computeNewIdealStates(), the rebalance will return with
     // the previous rebalance result.