You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/01/23 19:30:57 UTC

[helix] branch wagedRebalancer updated: Add WAGED rebalancer reset method to clean up cached status. (#696)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 938ab9d  Add WAGED rebalancer reset method to clean up cached status. (#696)
938ab9d is described below

commit 938ab9dadea509a22b46d3db8a3504c11770fc38
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Jan 23 11:30:47 2020 -0800

    Add WAGED rebalancer reset method to clean up cached status. (#696)
    
    The reset method is for cleaning up any in-memory records within the WAGED rebalancer so we don't need to recreate one.
    
    Detailed change list:
    1. Add reset methods to all the stateful objects that are used in the WAGED rebalancer.
    2. Refine some of the potential race condition in the WAGED rebalancer components.
    3. Adjust the tests accordingly. Also adding new tests to cover the components reset / the WAGED rebalancer reset logic.
---
 .../changedetector/ResourceChangeDetector.java     | 13 ++--
 .../rebalancer/waged/AssignmentMetadataStore.java  | 19 +++--
 .../rebalancer/waged/WagedRebalancer.java          | 68 +++++++++---------
 .../stages/BestPossibleStateCalcStage.java         | 12 ++--
 .../java/org/apache/helix/model/ClusterConfig.java |  2 +-
 .../BestPossibleExternalViewVerifier.java          |  3 +-
 .../changedetector/TestResourceChangeDetector.java | 33 +++++++++
 .../waged/MockAssignmentMetadataStore.java         | 19 ++---
 .../waged/TestAssignmentMetadataStore.java         | 37 +++++++---
 .../rebalancer/waged/TestWagedRebalancer.java      | 80 +++++++++++++++++-----
 .../waged/TestWagedRebalancerMetrics.java          | 11 +--
 .../waged/constraints/MockRebalanceAlgorithm.java  |  2 +-
 .../WagedRebalancer/TestWagedRebalance.java        |  7 +-
 13 files changed, 211 insertions(+), 95 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 14fb750..27f4c50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -150,27 +150,32 @@ public class ResourceChangeDetector implements ChangeDetector {
     clearCachedComputation();
   }
 
+  public synchronized void resetSnapshots() {
+    _newSnapshot = new ResourceChangeSnapshot();
+    clearCachedComputation();
+  }
+
   @Override
-  public Collection<HelixConstants.ChangeType> getChangeTypes() {
+  public synchronized Collection<HelixConstants.ChangeType> getChangeTypes() {
     return Collections.unmodifiableSet(_newSnapshot.getChangedTypes());
   }
 
   @Override
-  public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
+  public synchronized Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
     return _changedItems.computeIfAbsent(changeType,
         changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot),
             determinePropertyMapByType(changeType, _newSnapshot)));
   }
 
   @Override
-  public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
+  public synchronized Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
     return _addedItems.computeIfAbsent(changeType,
         changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot),
             determinePropertyMapByType(changeType, _newSnapshot)));
   }
 
   @Override
-  public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
+  public synchronized Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
     return _removedItems.computeIfAbsent(changeType,
         changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
             determinePropertyMapByType(changeType, _newSnapshot)));
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 6128280..afd0187 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -62,7 +62,7 @@ public class AssignmentMetadataStore {
     _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
   }
 
-  public Map<String, ResourceAssignment> getBaseline() {
+  public synchronized Map<String, ResourceAssignment> getBaseline() {
     // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
     if (_globalBaseline == null) {
       try {
@@ -77,7 +77,7 @@ public class AssignmentMetadataStore {
     return _globalBaseline;
   }
 
-  public Map<String, ResourceAssignment> getBestPossibleAssignment() {
+  public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() {
     // Return the in-memory baseline. If null, read from ZK. This is to minimize reads from ZK
     if (_bestPossibleAssignment == null) {
       try {
@@ -98,7 +98,7 @@ public class AssignmentMetadataStore {
    */
   // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
   // TODO: when it is skipped.
-  public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     // TODO: Make the write async?
     // If baseline hasn't changed, skip writing to metadata store
     if (compareAssignments(_globalBaseline, globalBaseline)) {
@@ -124,7 +124,7 @@ public class AssignmentMetadataStore {
    */
   // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
   // TODO: when it is skipped.
-  public boolean persistBestPossibleAssignment(
+  public synchronized boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // TODO: Make the write async?
     // If bestPossibleAssignment hasn't changed, skip writing to metadata store
@@ -146,6 +146,17 @@ public class AssignmentMetadataStore {
     return true;
   }
 
+  protected synchronized void reset() {
+    if (_bestPossibleAssignment != null) {
+      _bestPossibleAssignment.clear();
+      _bestPossibleAssignment = null;
+    }
+    if (_globalBaseline != null) {
+      _globalBaseline.clear();
+      _globalBaseline = null;
+    }
+  }
+
   protected void finalize() {
     // To ensure all resources are released.
     close();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 8fb22d8..f1c85db 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -83,6 +84,10 @@ public class WagedRebalancer {
       NOT_CONFIGURED_PREFERENCE = ImmutableMap
       .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
           ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
+  // The default algorithm to use when there is no preference configured.
+  private static final RebalanceAlgorithm DEFAULT_REBALANCE_ALGORITHM =
+      ConstraintBasedAlgorithmFactory
+          .getInstance(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
 
   // To calculate the baseline asynchronously
   private final ExecutorService _baselineCalculateExecutor;
@@ -117,12 +122,11 @@ public class WagedRebalancer {
     return null;
   }
 
-  public WagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
-      boolean isAsyncGlobalRebalanceEnabled) {
+  public WagedRebalancer(HelixManager helixManager) {
     this(helixManager == null ? null
             : constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
-                helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
+                helixManager.getClusterName()),
+        DEFAULT_REBALANCE_ALGORITHM,
         // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
         // Mapping calculator will translate the best possible assignment into the applicable state
         // mapping based on the current states.
@@ -130,28 +134,16 @@ public class WagedRebalancer {
         new DelayedAutoRebalancer(),
         // Helix Manager is required for the rebalancer scheduler
         helixManager,
-        // If HelixManager is null, we just pass in null for MetricCollector so that a
-        // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
-        // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
-        // in this case, WagedRebalancer will not read/write to metadata store and just use
-        // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
-        // verifying whether the cluster has converged.
-        helixManager == null ? null
+        // If HelixManager is null, we just pass in a non-functioning WagedRebalancerMetricCollector
+        // that will not be registered to MBean.
+        // This is to handle two cases: 1. HelixManager is null for non-testing cases. In this case,
+        // WagedRebalancer will not read/write to metadata store and just use CurrentState-based
+        // rebalancing. 2. Tests that require instrumenting the rebalancer for verifying whether the
+        // cluster has converged.
+        helixManager == null ? new WagedRebalancerMetricCollector()
             : new WagedRebalancerMetricCollector(helixManager.getClusterName()),
-        isAsyncGlobalRebalanceEnabled);
-    _preference = ImmutableMap.copyOf(preference);
-  }
-
-  /**
-   * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
-   * the rebalancer will not schedule for a future delayed rebalance. With null MetricCollector, the
-   * rebalancer will not emit JMX metrics.
-   * @param assignmentMetadataStore
-   * @param algorithm
-   */
-  protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
-      RebalanceAlgorithm algorithm) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null, false);
+        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+    _preference = ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
   }
 
   /**
@@ -159,11 +151,14 @@ public class WagedRebalancer {
    * not schedule for a future delayed rebalance.
    * @param assignmentMetadataStore
    * @param algorithm
-   * @param metricCollector
+   * @param metricCollectorOptional
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
-      RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector,
+      RebalanceAlgorithm algorithm, Optional<MetricCollector> metricCollectorOptional) {
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null,
+        // If metricCollector is not provided, instantiate a version that does not register metrics
+        // in order to allow rebalancer to proceed
+        metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
         false);
   }
 
@@ -177,12 +172,13 @@ public class WagedRebalancer {
     _assignmentMetadataStore = assignmentMetadataStore;
     _rebalanceAlgorithm = algorithm;
     _mappingCalculator = mappingCalculator;
+    if (manager == null) {
+      LOG.warn("HelixManager is not provided. The rebalancer is not going to schedule for a future "
+          + "rebalance even when delayed rebalance is enabled.");
+    }
     _manager = manager;
 
-    // If metricCollector is null, instantiate a version that does not register metrics in order to
-    // allow rebalancer to proceed
-    _metricCollector =
-        metricCollector == null ? new WagedRebalancerMetricCollector() : metricCollector;
+    _metricCollector = metricCollector;
     _rebalanceFailureCount = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
         CountMetric.class);
@@ -232,6 +228,14 @@ public class WagedRebalancer {
     }
   }
 
+  // Clean up the internal cached rebalance status.
+  public void reset() {
+    if (_assignmentMetadataStore != null) {
+      _assignmentMetadataStore.reset();
+    }
+    _changeDetector.resetSnapshots();
+  }
+
   // Release all the resources.
   public void close() {
     if (_baselineCalculateExecutor != null) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index c2fdfb3..a73b0c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -125,14 +125,12 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       boolean isAsyncGlobalRebalanceEnabled) {
     // Create WagedRebalancer instance if it hasn't been already initialized
     if (_wagedRebalancer == null) {
-      _wagedRebalancer =
-          new WagedRebalancer(helixManager, preferences, isAsyncGlobalRebalanceEnabled);
-    } else {
-      // Since the rebalance configuration can be updated at runtime, try to update the rebalancer
-      // before returning.
-      _wagedRebalancer.updateRebalancePreference(preferences);
-      _wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
+      _wagedRebalancer = new WagedRebalancer(helixManager);
     }
+    // Since the rebalance configuration can be updated at runtime, try to update the rebalancer
+    // before returning.
+    _wagedRebalancer.updateRebalancePreference(preferences);
+    _wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
     return _wagedRebalancer;
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index f2b6090..f88d2f5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -136,7 +136,7 @@ public class ClusterConfig extends HelixProperty {
           .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
-  private final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
 
   /**
    * Instantiate for a specific cluster
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 9f44a37..6d2c119 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
@@ -424,7 +425,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
       super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
-          ConstraintBasedAlgorithmFactory.getInstance(preferences));
+          ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
     }
 
     @Override
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
index 275e32c..bac9842 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -382,6 +382,39 @@ public class TestResourceChangeDetector extends ZkTestBase {
             .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
   }
 
+  @Test(dependsOnMethods = "testIgnoreControllerGeneratedFields")
+  public void testResetSnapshots() {
+    // Initialize a new detector with the existing data
+    ResourceChangeDetector changeDetector = new ResourceChangeDetector();
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+    changeDetector.updateSnapshots(_dataProvider);
+    Assert.assertEquals(
+        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);
+
+    // Update the detector with old data, since nothing changed, the result will be empty.
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+    changeDetector.updateSnapshots(_dataProvider);
+    Assert.assertEquals(
+        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
+
+    // Reset the snapshots
+    changeDetector.resetSnapshots();
+    // After reset, all the data in the data provider will be treated as new changes
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+    changeDetector.updateSnapshots(_dataProvider);
+    Assert.assertEquals(
+        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);
+  }
+
   /**
    * Check that the given change types appear in detector's change types.
    * @param types
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 72c72a8..6e7f896 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -19,8 +19,9 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Map;
+
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
 import org.mockito.Mockito;
@@ -30,38 +31,30 @@ import org.mockito.Mockito;
  * This mock datastore persist assignments in memory only.
  */
 public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
-  private Map<String, ResourceAssignment> _persistGlobalBaseline = new HashMap<>();
-  private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new HashMap<>();
-
   MockAssignmentMetadataStore() {
     super(Mockito.mock(BucketDataAccessor.class), "");
   }
 
   public Map<String, ResourceAssignment> getBaseline() {
-    return _persistGlobalBaseline;
+    return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
   }
 
   public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
-    _persistGlobalBaseline = globalBaseline;
+    _globalBaseline = globalBaseline;
     return true;
   }
 
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
-    return _persistBestPossibleAssignment;
+    return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment;
   }
 
   public boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
-    _persistBestPossibleAssignment = bestPossibleAssignment;
+    _bestPossibleAssignment = bestPossibleAssignment;
     return true;
   }
 
   public void close() {
     // do nothing
   }
-
-  public void clearMetadataStore() {
-    _persistBestPossibleAssignment.clear();
-    _persistGlobalBaseline.clear();
-  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 59326e7..3237420 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -119,14 +119,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     String baselineKey = "BASELINE";
     String bestPossibleKey = "BEST_POSSIBLE";
 
-    // Generate a dummy assignment
-    Map<String, ResourceAssignment> dummyAssignment = new HashMap<>();
-    ResourceAssignment assignment = new ResourceAssignment(TEST_DB);
-    Partition partition = new Partition(TEST_DB);
-    Map<String, String> replicaMap = new HashMap<>();
-    replicaMap.put(TEST_DB, TEST_DB);
-    assignment.addReplicaMap(partition, replicaMap);
-    dummyAssignment.put(TEST_DB, new ResourceAssignment(TEST_DB));
+    Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
 
     // Call persist functions
     _store.persistBaseline(dummyAssignment);
@@ -149,6 +142,34 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     Assert.assertEquals(bestPossibleVersions.size(), 1);
   }
 
+  @Test
+  public void testAssignmentCache() {
+    Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
+    // Call persist functions
+    _store.persistBaseline(dummyAssignment);
+    _store.persistBestPossibleAssignment(dummyAssignment);
+
+    Assert.assertEquals(_store._bestPossibleAssignment, dummyAssignment);
+    Assert.assertEquals(_store._globalBaseline, dummyAssignment);
+
+    _store.reset();
+
+    Assert.assertEquals(_store._bestPossibleAssignment, null);
+    Assert.assertEquals(_store._globalBaseline, null);
+  }
+
+  private Map<String, ResourceAssignment> getDummyAssignment() {
+    // Generate a dummy assignment
+    Map<String, ResourceAssignment> dummyAssignment = new HashMap<>();
+    ResourceAssignment assignment = new ResourceAssignment(TEST_DB);
+    Partition partition = new Partition(TEST_DB);
+    Map<String, String> replicaMap = new HashMap<>();
+    replicaMap.put(TEST_DB, TEST_DB);
+    assignment.addReplicaMap(partition, replicaMap);
+    dummyAssignment.put(TEST_DB, new ResourceAssignment(TEST_DB));
+    return dummyAssignment;
+  }
+
   /**
    * Returns a list of existing version numbers only.
    * @param metadataType
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 2b172e8..8efa66b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -115,8 +116,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test
   public void testRebalance() throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -141,8 +142,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testPartialRebalance() throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -159,7 +160,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     // Test with partial resources listed in the resourceMap input.
     // Remove the first resource from the input. Note it still exists in the cluster data cache.
-    _metadataStore.clearMetadataStore();
+    _metadataStore.reset();
     resourceMap.remove(_resourceNames.get(0));
     Map<String, IdealState> newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
@@ -169,8 +170,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
   @Test(dependsOnMethods = "testRebalance")
   public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -232,8 +233,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
   public void testNonCompatibleConfiguration()
       throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String nonCompatibleResourceName = _resourceNames.get(0);
@@ -253,8 +254,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   // TODO test with invalid capacity configuration which will fail the cluster model constructing.
   @Test(dependsOnMethods = "testRebalance")
   public void testInvalidClusterStatus() throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     String invalidResource = _resourceNames.get(0);
@@ -286,7 +287,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
     when(metadataStore.getBaseline())
         .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
-    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+    WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm, Optional.empty());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     // The input resource Map shall contain all the valid resources.
@@ -306,8 +307,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
   @Test(dependsOnMethods = "testRebalance")
   public void testAlgorithmException()
       throws IOException, HelixRebalanceException {
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
@@ -325,7 +326,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
     when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
         HelixRebalanceException.Type.FAILED_TO_CALCULATE));
-    rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
+    rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, Optional.empty());
 
     // Calculation will fail
     try {
@@ -355,8 +356,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
     // won't propagate any existing assignment from the cluster model.
-    _metadataStore.clearMetadataStore();
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
 
     // 1. rebalance with baseline calculation done
     // Generate the input for the rebalancer.
@@ -460,6 +461,51 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
   }
 
+  @Test(dependsOnMethods = "testRebalance")
+  public void testReset() throws IOException, HelixRebalanceException {
+    _metadataStore.reset();
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, Optional.empty());
+    // Generate the input for the rebalancer.
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().stream()
+              .forEach(partition -> resource.addPartition(partition));
+          return resource;
+        }));
+    // Mocking the change types for triggering a baseline rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, IdealState> newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    Map<String, ResourceAssignment> algorithmResult = _algorithm.getRebalanceResult();
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+
+    // Clean up algorithm result for the next test step
+    algorithmResult.clear();
+    // Try to trigger a new rebalancer, since nothing has been changed. There will be no rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Assert.assertEquals(
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput()),
+        newIdealStates);
+    algorithmResult = _algorithm.getRebalanceResult();
+    Assert.assertEquals(algorithmResult, Collections.emptyMap());
+
+    // Reset the rebalance and do the same operation. Without any cache info, the rebalancer will
+    // finish the complete rebalance.
+    rebalancer.reset();
+    algorithmResult.clear();
+    // Try to trigger a new rebalancer, since nothing has been changed. There will be no rebalance.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    newIdealStates =
+        rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    algorithmResult = _algorithm.getRebalanceResult();
+    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+  }
+
   private void validateRebalanceResult(Map<String, Resource> resourceMap,
       Map<String, IdealState> newIdealStates, Map<String, ResourceAssignment> expectedResult) {
     Assert.assertEquals(newIdealStates.keySet(), resourceMap.keySet());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index 92132de..a38753a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -73,9 +74,10 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
   @Test
   public void testMetricValuePropagation()
       throws JMException, HelixRebalanceException, IOException {
-    _metadataStore.clearMetadataStore();
+    _metadataStore.reset();
     _metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, _metricCollector);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, Optional.of(_metricCollector));
 
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -97,9 +99,10 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
   @Test
   public void testWagedRebalanceMetrics()
       throws Exception {
-    _metadataStore.clearMetadataStore();
+    _metadataStore.reset();
     MetricCollector metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
-    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, metricCollector);
+    WagedRebalancer rebalancer =
+        new WagedRebalancer(_metadataStore, _algorithm, Optional.of(metricCollector));
     // Generate the input for the rebalancer.
     ResourceControllerDataProvider clusterData = setupClusterDataCache();
     Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
index 759c685..e3b9523 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -79,6 +79,6 @@ public class MockRebalanceAlgorithm implements RebalanceAlgorithm {
   }
 
   public Map<String, ResourceAssignment> getRebalanceResult() {
-    return _resultHistory;
+    return new HashMap<>(_resultHistory);
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 11214ce..5af3d61 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -416,8 +416,8 @@ public class TestWagedRebalance extends ZkTestBase {
       throws InterruptedException {
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
-    clusterConfig.setGlobalRebalancePreference(ImmutableMap.of(
-        ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0, ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
+    clusterConfig.setGlobalRebalancePreference(ImmutableMap
+        .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0, ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
     int i = 0;
@@ -431,7 +431,8 @@ public class TestWagedRebalance extends ZkTestBase {
     validate(_replica);
 
     String newNodeName = "newNode-" + TestHelper.getTestMethodName() + "_" + START_PORT;
-    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
     try {
       _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newNodeName);
       participant.syncStart();