You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/12/21 00:42:10 UTC

[helix] branch wagedRebalancer updated: Asynchronously calculating the Baseline (#632)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 302e1e7  Asynchronously calculating the Baseline (#632)
302e1e7 is described below

commit 302e1e7ef6df047a8b08a66282a9ba810bc96690
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Dec 20 16:42:04 2019 -0800

    Asynchronously calculating the Baseline (#632)
    
    * Enable the Baseline calculation to be asynchronously done.
    
    This will greatly fasten the rebalance speed. Basically, the WAGED rebalancer will firstly partial rebalance to recover the invalid replica allocations (for example, the ones that are on a disabled instance). Then it calculates the new baseline by global rebalancing.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  |  24 ++-
 .../rebalancer/waged/WagedRebalancer.java          | 109 ++++++++++---
 .../stages/BestPossibleStateCalcStage.java         |  17 +-
 .../BestPossibleExternalViewVerifier.java          |  14 +-
 .../waged/MockAssignmentMetadataStore.java         |   6 +-
 .../rebalancer/TestMixedModeAutoRebalance.java     | 181 +++++++++++----------
 .../TestMixedModeWagedRebalance.java               |  10 +-
 .../TestWagedExpandCluster.java                    |   3 +-
 8 files changed, 229 insertions(+), 135 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 843d1b6..6128280 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -92,11 +92,17 @@ public class AssignmentMetadataStore {
     return _bestPossibleAssignment;
   }
 
-  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  /**
+   * @return true if a new baseline was persisted.
+   * @throws HelixException if the method failed to persist the baseline.
+   */
+  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
+  // TODO: when it is skipped.
+  public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     // TODO: Make the write async?
     // If baseline hasn't changed, skip writing to metadata store
     if (compareAssignments(_globalBaseline, globalBaseline)) {
-      return;
+      return false;
     }
     // Persist to ZK
     HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, globalBaseline);
@@ -109,14 +115,21 @@ public class AssignmentMetadataStore {
 
     // Update the in-memory reference
     _globalBaseline = globalBaseline;
+    return true;
   }
 
-  public void persistBestPossibleAssignment(
+  /**
+   * @return true if a new best possible assignment was persisted.
+   * @throws HelixException if the method failed to persist the baseline.
+   */
+  // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
+  // TODO: when it is skipped.
+  public boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // TODO: Make the write async?
     // If bestPossibleAssignment hasn't changed, skip writing to metadata store
     if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
-      return;
+      return false;
     }
     // Persist to ZK
     HelixProperty combinedAssignments =
@@ -130,6 +143,7 @@ public class AssignmentMetadataStore {
 
     // Update the in-memory reference
     _bestPossibleAssignment = bestPossibleAssignment;
+    return true;
   }
 
   protected void finalize() {
@@ -179,7 +193,7 @@ public class AssignmentMetadataStore {
    * @param newAssignment
    * @return true if they are the same. False otherwise or oldAssignment is null
    */
-  private boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
+  protected boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
       Map<String, ResourceAssignment> newAssignment) {
     // If oldAssignment is null, that means that we haven't read from/written to
     // the metadata store yet. In that case, we return false so that we write to metadata store.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index b05287e..8fb22d8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -26,6 +26,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
@@ -54,6 +58,7 @@ import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +84,8 @@ public class WagedRebalancer {
       .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
           ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
 
+  // To calculate the baseline asynchronously
+  private final ExecutorService _baselineCalculateExecutor;
   private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
@@ -86,14 +93,16 @@ public class WagedRebalancer {
 
   private final MetricCollector _metricCollector;
   private final CountMetric _rebalanceFailureCount;
-  private final CountMetric _globalBaselineCalcCounter;
-  private final LatencyMetric _globalBaselineCalcLatency;
+  private final CountMetric _baselineCalcCounter;
+  private final LatencyMetric _baselineCalcLatency;
   private final LatencyMetric _writeLatency;
   private final CountMetric _partialRebalanceCounter;
   private final LatencyMetric _partialRebalanceLatency;
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
+  private boolean _asyncGlobalRebalanceEnabled;
+
   // Note, the rebalance algorithm field is mutable so it should not be directly referred except for
   // the public method computeNewIdealStates.
   private RebalanceAlgorithm _rebalanceAlgorithm;
@@ -109,7 +118,8 @@ public class WagedRebalancer {
   }
 
   public WagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
+      boolean isAsyncGlobalRebalanceEnabled) {
     this(helixManager == null ? null
             : constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
                 helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
@@ -127,7 +137,8 @@ public class WagedRebalancer {
         // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
         // verifying whether the cluster has converged.
         helixManager == null ? null
-            : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
+            : new WagedRebalancerMetricCollector(helixManager.getClusterName()),
+        isAsyncGlobalRebalanceEnabled);
     _preference = ImmutableMap.copyOf(preference);
   }
 
@@ -140,7 +151,7 @@ public class WagedRebalancer {
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null, false);
   }
 
   /**
@@ -152,12 +163,13 @@ public class WagedRebalancer {
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector);
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, metricCollector,
+        false);
   }
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
-      MetricCollector metricCollector) {
+      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the rebalance.");
@@ -174,10 +186,10 @@ public class WagedRebalancer {
     _rebalanceFailureCount = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
         CountMetric.class);
-    _globalBaselineCalcCounter = _metricCollector.getMetric(
+    _baselineCalcCounter = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
         CountMetric.class);
-    _globalBaselineCalcLatency = _metricCollector.getMetric(
+    _baselineCalcLatency = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
             .name(),
         LatencyMetric.class);
@@ -199,23 +211,32 @@ public class WagedRebalancer {
         BaselineDivergenceGauge.class);
 
     _changeDetector = new ResourceChangeDetector(true);
+
+    _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+  }
+
+  // Update the global rebalance mode to be asynchronous or synchronous
+  public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
   }
 
-  // Update the rebalancer preference configuration if the new preference is different from the
-  // current preference configuration.
-  public void updatePreference(
+  // Update the rebalancer preference if the new options are different from the current preference.
+  public synchronized void updateRebalancePreference(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
-    if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
-      // 1. if the preference was not configured during constructing, no need to update.
-      // 2. if the preference equals to the new preference, no need to update.
-      return;
+    // 1. if the preference was not configured during constructing, no need to update.
+    // 2. if the preference equals to the new preference, no need to update.
+    if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && !_preference.equals(newPreference)) {
+      _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
+      _preference = ImmutableMap.copyOf(newPreference);
     }
-    _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
-    _preference = ImmutableMap.copyOf(newPreference);
   }
 
   // Release all the resources.
   public void close() {
+    if (_baselineCalculateExecutor != null) {
+      _baselineCalculateExecutor.shutdownNow();
+    }
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
@@ -295,7 +316,7 @@ public class WagedRebalancer {
     return newIdealStates;
   }
 
-  // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+  // Coordinate global rebalance and partial rebalance according to the cluster changes.
   private Map<String, IdealState> computeBestPossibleStates(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
@@ -413,7 +434,33 @@ public class WagedRebalancer {
             HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
       }
 
-      calculateAndUpdateBaseline(clusterModel, algorithm);
+      final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
+      final String clusterName = clusterData.getClusterName();
+      // Calculate the Baseline assignment for global rebalance.
+      Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
+        try {
+          // Note that we should schedule a new partial rebalance for a future rebalance pipeline if
+          // the planned partial rebalance in the current rebalance pipeline won't wait for the new
+          // baseline being calculated.
+          // So set shouldSchedulePartialRebalance to be !waitForGlobalRebalance
+          calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
+        } catch (HelixRebalanceException e) {
+          LOG.error("Failed to calculate baseline assignment!", e);
+          return false;
+        }
+        return true;
+      });
+      if (waitForGlobalRebalance) {
+        try {
+          if (!result.get()) {
+            throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
+                HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+        }
+      }
     }
   }
 
@@ -421,20 +468,25 @@ public class WagedRebalancer {
    * Calculate and update the Baseline assignment
    * @param clusterModel
    * @param algorithm
+   * @param shouldSchedulePartialRebalance True if the call should trigger a following partial rebalance
+   *                                   so the new Baseline could be applied to cluster.
+   * @param clusterName
    * @throws HelixRebalanceException
    */
-  private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm)
+  private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
+      boolean shouldSchedulePartialRebalance, String clusterName)
       throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
-    _globalBaselineCalcCounter.increment(1L);
-    _globalBaselineCalcLatency.startMeasuringLatency();
+    _baselineCalcCounter.increment(1L);
+    _baselineCalcLatency.startMeasuringLatency();
 
+    boolean isBaselineChanged = false;
     Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
     // Write the new baseline to metadata store
     if (_assignmentMetadataStore != null) {
       try {
         _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.persistBaseline(newBaseline);
+        isBaselineChanged = _assignmentMetadataStore.persistBaseline(newBaseline);
         _writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
@@ -443,8 +495,13 @@ public class WagedRebalancer {
     } else {
       LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
     }
-    _globalBaselineCalcLatency.endMeasuringLatency();
-    LOG.info("Finish calculating the new baseline.");
+    _baselineCalcLatency.endMeasuringLatency();
+    LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
+
+    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+      LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+    }
   }
 
   private Map<String, ResourceAssignment> partialRebalance(
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fa580b7..c2fdfb3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -121,14 +121,17 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
   }
 
   private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
+      boolean isAsyncGlobalRebalanceEnabled) {
     // Create WagedRebalancer instance if it hasn't been already initialized
     if (_wagedRebalancer == null) {
-      _wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+      _wagedRebalancer =
+          new WagedRebalancer(helixManager, preferences, isAsyncGlobalRebalanceEnabled);
     } else {
-      // Since the preference can be updated at runtime, try to update the algorithm preference
-      // before returning the rebalancer.
-      _wagedRebalancer.updatePreference(preferences);
+      // Since the rebalance configuration can be updated at runtime, try to update the rebalancer
+      // before returning.
+      _wagedRebalancer.updateRebalancePreference(preferences);
+      _wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
     }
     return _wagedRebalancer;
   }
@@ -281,8 +284,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
+    ClusterConfig clusterConfig = cache.getClusterConfig();
     WagedRebalancer wagedRebalancer =
-        getWagedRebalancer(helixManager, cache.getClusterConfig().getGlobalRebalancePreference());
+        getWagedRebalancer(helixManager, clusterConfig.getGlobalRebalancePreference(),
+            clusterConfig.isGlobalRebalanceAsyncModeEnabled());
     try {
       newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
           currentStateOutput));
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 74554e9..9f44a37 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -443,16 +443,26 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     }
 
     @Override
-    public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+    public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+      // If baseline hasn't changed, skip writing to metadata store
+      if (compareAssignments(_globalBaseline, globalBaseline)) {
+        return false;
+      }
       // Update the in-memory reference only
       _globalBaseline = globalBaseline;
+      return true;
     }
 
     @Override
-    public void persistBestPossibleAssignment(
+    public boolean persistBestPossibleAssignment(
         Map<String, ResourceAssignment> bestPossibleAssignment) {
+      // If bestPossibleAssignment hasn't changed, skip writing to metadata store
+      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
+        return false;
+      }
       // Update the in-memory reference only
       _bestPossibleAssignment = bestPossibleAssignment;
+      return true;
     }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 7d05416..72c72a8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -41,17 +41,19 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
     return _persistGlobalBaseline;
   }
 
-  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     _persistGlobalBaseline = globalBaseline;
+    return true;
   }
 
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     return _persistBestPossibleAssignment;
   }
 
-  public void persistBestPossibleAssignment(
+  public boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     _persistBestPossibleAssignment = bestPossibleAssignment;
+    return true;
   }
 
   public void close() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index f4c875f..db51fd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -48,7 +48,6 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -60,7 +59,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
 
   private final String CLASS_NAME = getShortClassName();
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  protected static final String DB_NAME = "Test-DB";
 
   private ClusterControllerManager _controller;
   private List<MockParticipantManager> _participants = new ArrayList<>();
@@ -111,13 +109,13 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     };
   }
 
-  protected void createResource(String stateModel, int numPartition, int replica,
+  protected void createResource(String dbName, String stateModel, int numPartition, int replica,
       boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+      createResourceWithDelayedRebalance(CLUSTER_NAME, dbName, stateModel, numPartition, replica,
           replica - 1, 200, rebalanceStrategy);
     } else {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+      createResourceWithDelayedRebalance(CLUSTER_NAME, dbName, stateModel, numPartition, replica,
           replica, 0, rebalanceStrategy);
     }
   }
@@ -125,98 +123,111 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
   @Test(dataProvider = "stateModels")
   public void testUserDefinedPreferenceListsInFullAuto(String stateModel, boolean delayEnabled,
       String rebalanceStrateyName) throws Exception {
-    createResource(stateModel, _PARTITIONS, _replica, delayEnabled,
+    String dbName = "Test-DB-" + stateModel;
+    createResource(dbName, stateModel, _PARTITIONS, _replica, delayEnabled,
         rebalanceStrateyName);
-    IdealState idealState =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
-    Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
-    List<String> userDefinedPartitions = new ArrayList<>();
-    for (String partition : userDefinedPreferenceLists.keySet()) {
-      List<String> preferenceList = new ArrayList<>();
-      for (int k = _replica; k >= 0; k--) {
-        String instance = _participants.get(k).getInstanceName();
-        preferenceList.add(instance);
+    try {
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+      Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
+      List<String> userDefinedPartitions = new ArrayList<>();
+      for (String partition : userDefinedPreferenceLists.keySet()) {
+        List<String> preferenceList = new ArrayList<>();
+        for (int k = _replica; k >= 0; k--) {
+          String instance = _participants.get(k).getInstanceName();
+          preferenceList.add(instance);
+        }
+        userDefinedPreferenceLists.put(partition, preferenceList);
+        userDefinedPartitions.add(partition);
       }
-      userDefinedPreferenceLists.put(partition, preferenceList);
-      userDefinedPartitions.add(partition);
-    }
-
-    ResourceConfig resourceConfig =
-        new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
 
-    // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
-    Thread.sleep(500);
+      ResourceConfig resourceConfig =
+          new ResourceConfig.Builder(dbName).setPreferenceLists(userDefinedPreferenceLists).build();
+      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
 
-    Assert.assertTrue(_clusterVerifier.verify(3000));
-    verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
+      // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+      Thread.sleep(500);
 
-    while (userDefinedPartitions.size() > 0) {
-      IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
-          DB_NAME);
-      Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet());
-      nonUserDefinedPartitions.removeAll(userDefinedPartitions);
-
-      removePartitionFromUserDefinedList(DB_NAME, userDefinedPartitions);
-      // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier for the WAGED rebalancer.
-      Thread.sleep(1000);
       Assert.assertTrue(_clusterVerifier.verify(3000));
-      verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
-      verifyNonUserDefinedAssignment(DB_NAME, originIS, nonUserDefinedPartitions);
+      verifyUserDefinedPreferenceLists(dbName, userDefinedPreferenceLists,
+          userDefinedPartitions);
+
+      while (userDefinedPartitions.size() > 0) {
+        IdealState originIS =
+            _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+        Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet());
+        nonUserDefinedPartitions.removeAll(userDefinedPartitions);
+
+        removePartitionFromUserDefinedList(dbName, userDefinedPartitions);
+        // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier for the WAGED rebalancer.
+        Thread.sleep(1000);
+        Assert.assertTrue(_clusterVerifier.verify(3000));
+        verifyUserDefinedPreferenceLists(dbName, userDefinedPreferenceLists,
+            userDefinedPartitions);
+        verifyNonUserDefinedAssignment(dbName, originIS, nonUserDefinedPartitions);
+      }
+    } finally {
+      _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, dbName);
+      _clusterVerifier.verify(5000);
     }
   }
 
   @Test
   public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws Exception {
-    createResource(BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica,
-        false, CrushRebalanceStrategy.class.getName());
-
-    IdealState idealState =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
-    Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
-
-    List<String> newNodes = new ArrayList<>();
-    for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) {
-      String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
-
-      // start dummy participants
-      MockParticipantManager participant =
-          new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
-      participant.syncStart();
-      _participants.add(participant);
-      newNodes.add(instance);
-    }
-
-    List<String> userDefinedPartitions = new ArrayList<>();
-    for (String partition : userDefinedPreferenceLists.keySet()) {
-      userDefinedPreferenceLists.put(partition, newNodes);
-      userDefinedPartitions.add(partition);
-    }
+    String dbName = "Test-DB-withErrors";
+    createResource(dbName, BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, false,
+        CrushRebalanceStrategy.class.getName());
+    try {
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+      Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
+
+      List<String> newNodes = new ArrayList<>();
+      for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) {
+        String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+        _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+
+        // start dummy participants
+        MockParticipantManager participant =
+            new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
+        participant.syncStart();
+        _participants.add(participant);
+        newNodes.add(instance);
+      }
 
-    ResourceConfig resourceConfig =
-        new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
+      List<String> userDefinedPartitions = new ArrayList<>();
+      for (String partition : userDefinedPreferenceLists.keySet()) {
+        userDefinedPreferenceLists.put(partition, newNodes);
+        userDefinedPartitions.add(partition);
+      }
 
-    TestHelper.verify(() -> {
-      ExternalView ev =
-          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
-      if (ev != null) {
-        for (String partition : ev.getPartitionSet()) {
-          Map<String, String> stateMap = ev.getStateMap(partition);
-          if (stateMap.values().contains("ERROR")) {
-            return true;
+      ResourceConfig resourceConfig =
+          new ResourceConfig.Builder(dbName).setPreferenceLists(userDefinedPreferenceLists).build();
+      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
+
+      TestHelper.verify(() -> {
+        ExternalView ev =
+            _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+        if (ev != null) {
+          for (String partition : ev.getPartitionSet()) {
+            Map<String, String> stateMap = ev.getStateMap(partition);
+            if (stateMap.values().contains("ERROR")) {
+              return true;
+            }
           }
         }
-      }
-      return false;
-    }, 2000);
-
-    ExternalView ev =
-        _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
-    IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
-        DB_NAME);
-    validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+        return false;
+      }, 2000);
+
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName);
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    } finally {
+      _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, dbName);
+      _clusterVerifier.verify(5000);
+    }
   }
 
   private void verifyUserDefinedPreferenceLists(String db,
@@ -260,12 +271,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
     _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
   }
 
-  @AfterMethod
-  public void afterMethod() {
-    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
-    _clusterVerifier.verify(5000);
-  }
-
   @AfterClass
   public void afterClass() throws Exception {
     /**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
index eb9e0f8..c482e8f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -39,20 +39,20 @@ public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance {
     };
   }
 
-  protected void createResource(String stateModel, int numPartition,
-      int replica, boolean delayEnabled, String rebalanceStrategy) {
+  protected void createResource(String dbName, String stateModel, int numPartition, int replica,
+      boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
       setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 200);
-      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+      createResourceWithWagedRebalance(CLUSTER_NAME, dbName, stateModel, numPartition, replica,
           replica - 1);
     } else {
-      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, replica);
+      createResourceWithWagedRebalance(CLUSTER_NAME, dbName, stateModel, numPartition, replica,
+          replica);
     }
   }
 
   @AfterMethod
   public void afterMethod() {
-    super.afterMethod();
     setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
similarity index 92%
rename from helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
rename to helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
index 37e76ee..156c855 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
@@ -1,4 +1,4 @@
-package org.apache.helix.integration.rebalancer.PartitionMigration;
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -22,6 +22,7 @@ package org.apache.helix.integration.rebalancer.PartitionMigration;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.helix.integration.rebalancer.PartitionMigration.TestExpandCluster;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;