You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:37 UTC

[helix] 34/37: Enable maintenance mode for the WAGED rebalancer.

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 1712a88724ee69d771e4da5bb234c56eb30898c8
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Sun Sep 29 15:50:25 2019 -0700

    Enable maintenance mode for the WAGED rebalancer.
    
    The maintenance mode rebalance logic keeps the same as the previous feature.
    Add more tests about partition migration and node swap that requires maintenance mode.
---
 .../ConstraintBasedAlgorithmFactory.java           |  19 +-
 .../stages/BestPossibleStateCalcStage.java         |  20 +-
 .../TestPartitionMigrationBase.java                |  15 +-
 .../PartitionMigration/TestWagedExpandCluster.java |  65 +++++
 .../TestWagedRebalancerMigration.java              | 108 ++++++++
 .../rebalancer/WagedRebalancer/TestNodeSwap.java   | 291 +++++++++++++++++++++
 6 files changed, 500 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 8568444..657fc82 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -22,32 +22,37 @@ package org.apache.helix.controller.rebalancer.waged.constraints;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
-import org.apache.helix.model.ClusterConfig;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.model.ClusterConfig;
 
 /**
  * The factory class to create an instance of {@link ConstraintBasedAlgorithm}
  */
 public class ConstraintBasedAlgorithmFactory {
+  // Evenness constraints tend to score within a smaller range.
+  // In order to let their scores cause enough difference in the final evaluation result, we need to
+  // enlarge the overall weight of the evenness constraints compared with the movement constraint.
+  // TODO: Tune or make the following factor configurable.
+  private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
 
   public static RebalanceAlgorithm getInstance(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    List<HardConstraint> hardConstraints =
-        ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
+    List<HardConstraint> hardConstraints = ImmutableList
+        .of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
             new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
             new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());
 
     int evennessPreference =
-        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
+        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1)
+            * EVENNESS_PREFERENCE_NORMALIZE_FACTOR;
     int movementPreference =
         preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
     float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference);
     float movementRatio = (float) movementPreference / (evennessPreference + movementPreference);
 
-    Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder()
+    Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float>builder()
         .put(new PartitionMovementConstraint(), movementRatio)
         .put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio)
         .put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index cd7ab59..8c082f1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -235,6 +236,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
       HelixManager helixManager, Map<String, Resource> resourceMap, BestPossibleStateOutput output,
       List<String> failureResources) {
+    if (cache.isMaintenanceModeEnabled()) {
+      // The WAGED rebalancer won't be used while maintenance mode is enabled.
+      return Collections.emptyMap();
+    }
+
     // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
     Map<String, Resource> wagedRebalancedResourceMap =
         resourceMap.entrySet().stream().filter(resourceEntry -> {
@@ -394,10 +400,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
-      String resourceName, boolean isMaintenanceModeEnabled) {
+  private Rebalancer<ResourceControllerDataProvider> getCustomizedRebalancer(
+      String rebalancerClassName, String resourceName) {
     Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
-    String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName != null) {
       if (logger.isDebugEnabled()) {
         LogUtil.logDebug(logger, _eventId,
@@ -411,13 +416,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
             "Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
       }
     }
+    return customizedRebalancer;
+  }
 
+  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
+      String resourceName, boolean isMaintenanceModeEnabled) {
     Rebalancer<ResourceControllerDataProvider> rebalancer = null;
     switch (idealState.getRebalanceMode()) {
     case FULL_AUTO:
       if (isMaintenanceModeEnabled) {
         rebalancer = new MaintenanceRebalancer();
       } else {
+        Rebalancer<ResourceControllerDataProvider> customizedRebalancer =
+            getCustomizedRebalancer(idealState.getRebalancerClassName(), resourceName);
         if (customizedRebalancer != null) {
           rebalancer = customizedRebalancer;
         } else {
@@ -433,14 +444,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       break;
     case USER_DEFINED:
     case TASK:
-      rebalancer = customizedRebalancer;
+      rebalancer = getCustomizedRebalancer(idealState.getRebalancerClassName(), resourceName);
       break;
     default:
       LogUtil.logError(logger, _eventId,
           "Fail to find the rebalancer, invalid rebalance mode " + idealState.getRebalanceMode());
       break;
     }
-
     return rebalancer;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
index 7a559a5..61d72a2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -57,14 +57,14 @@ public class TestPartitionMigrationBase extends ZkTestBase {
   protected ClusterControllerManager _controller;
 
   List<MockParticipantManager> _participants = new ArrayList<>();
-  int _replica = 3;
-  int _minActiveReplica = _replica - 1;
+  protected int _replica = 3;
+  protected int _minActiveReplica = _replica - 1;
   ZkHelixClusterVerifier _clusterVerifier;
-  List<String> _testDBs = new ArrayList<>();
+  protected List<String> _testDBs = new ArrayList<>();
 
   MigrationStateVerifier _migrationVerifier;
   HelixManager _manager;
-  ConfigAccessor _configAccessor;
+  protected ConfigAccessor _configAccessor;
 
 
   @BeforeClass
@@ -84,8 +84,7 @@ public class TestPartitionMigrationBase extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier = getVerifier();
 
     enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
 
@@ -95,6 +94,10 @@ public class TestPartitionMigrationBase extends ZkTestBase {
     _configAccessor = new ConfigAccessor(_gZkClient);
   }
 
+  protected ZkHelixClusterVerifier getVerifier() {
+    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
   protected MockParticipantManager createAndStartParticipant(String instancename) {
     _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
new file mode 100644
index 0000000..d303e87
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
@@ -0,0 +1,65 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+
+public class TestWagedExpandCluster extends TestExpandCluster {
+// TODO check the movements in between
+  protected ZkHelixClusterVerifier getVerifier() {
+    Set<String> dbNames = new HashSet<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      dbNames.add("Test-DB-" + i++);
+    }
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+        .setZkAddr(ZK_ADDR).build();
+  }
+
+  protected Map<String, IdealState> createTestDBs(long delayTime) {
+    Map<String, IdealState> idealStateMap = new HashMap<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _minActiveReplica);
+      _testDBs.add(db);
+    }
+    for (String db : _testDBs) {
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      idealStateMap.put(db, is);
+    }
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(delayTime);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    return idealStateMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
new file mode 100644
index 0000000..1a13496
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
@@ -0,0 +1,108 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
+  ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @DataProvider(name = "stateModels")
+  public static Object[][] stateModels() {
+    return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), true },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), true },
+        { BuiltInStateModelDefinitions.MasterSlave.name(), false },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), false },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), false },
+    };
+  }
+
+  // TODO check the movements in between
+  @Test(dataProvider = "stateModels")
+  public void testMigrateToWagedRebalancerWhileExpandCluster(String stateModel,
+      boolean delayEnabled) throws Exception {
+    String db = "Test-DB-" + stateModel;
+    if (delayEnabled) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica - 1, 3000000, CrushRebalanceStrategy.class.getName());
+    } else {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica, 0, CrushRebalanceStrategy.class.getName());
+    }
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    ClusterConfig config = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    config.setDelayRebalaceEnabled(delayEnabled);
+    config.setRebalanceDelayTime(3000000);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, config);
+
+    // add new instance to the cluster
+    int numNodes = _participants.size();
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+      _participants.add(participant);
+      Thread.sleep(100);
+    }
+    Thread.sleep(2000);
+    ZkHelixClusterVerifier clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
+            .setResources(Collections.singleton(db)).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verifyByPolling());
+
+    _migrationVerifier =
+        new MigrationStateVerifier(Collections.singletonMap(db, idealState), _manager);
+
+    _migrationVerifier.reset();
+    _migrationVerifier.start();
+
+    IdealState currentIdealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
+    Thread.sleep(2000);
+    Assert.assertTrue(clusterVerifier.verifyByPolling());
+
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
new file mode 100644
index 0000000..360e495
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
@@ -0,0 +1,291 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNodeSwap extends ZkTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+  protected HelixClusterVerifier _clusterVerifier;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Set<String> _allDBs = new HashSet<>();
+  int _replica = 3;
+
+  String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology("/zone/instance");
+    clusterConfig.setFaultZoneType("zone");
+    clusterConfig.setDelayRebalaceEnabled(true);
+    // Set a long enough time to ensure delayed rebalance is activate
+    clusterConfig.setRebalanceDelayTime(3000000);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    Set<String> nodes = new HashSet<>();
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      String domain = String.format("zone=%s,instance=%s", zone, storageNodeName);
+
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+      instanceConfig.setDomain(domain);
+      _gSetupTool.getClusterManagementTool()
+          .setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig);
+      nodes.add(storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica - 1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(1000);
+
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test
+  public void testNodeSwap() throws Exception {
+    Map<String, ExternalView> record = new HashMap<>();
+    for (String db : _allDBs) {
+      record.put(db,
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db));
+    }
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    // 1. disable an old node
+    MockParticipantManager oldParticipant = _participants.get(0);
+    String oldParticipantName = oldParticipant.getInstanceName();
+    final InstanceConfig instanceConfig =
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
+    instanceConfig.setInstanceEnabled(false);
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceConfig(CLUSTER_NAME, oldParticipantName, instanceConfig);
+    Assert.assertTrue(_clusterVerifier.verify(10000));
+
+    // 2. then entering maintenance mode and remove it from topology
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, "NodeSwap", Collections.emptyMap());
+    oldParticipant.syncStop();
+    _participants.remove(oldParticipant);
+    Thread.sleep(2000);
+    _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);
+
+    // 3. create new participant with same topology
+    String newParticipantName = "RandomParticipant_" + START_PORT;
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
+    InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
+    String zone = instanceConfig.getDomainAsMap().get("zone");
+    String domain = String.format("zone=%s,instance=%s", zone, newParticipantName);
+    newConfig.setDomain(domain);
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
+
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newParticipantName);
+    participant.syncStart();
+    _participants.add(0, participant);
+
+    // 4. exit maintenance mode and rebalance
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
+
+    Thread.sleep(2000);
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    // Since only one node temporary down, the same partitions will be moved to the newly added node.
+    for (String db : _allDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      ExternalView oldEv = record.get(db);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> stateMap = ev.getStateMap(partition);
+        Map<String, String> oldStateMap = oldEv.getStateMap(partition);
+        Assert.assertTrue(oldStateMap != null && stateMap != null);
+        Assert.assertEquals(stateMap.size(), _replica);
+        // Note the WAGED rebalanacer won't ensure the same state, because moving the top states
+        // back to the replaced node might be unnecessary and causing overhead.
+        Set<String> instanceSet = new HashSet<>(stateMap.keySet());
+        if (instanceSet.remove(newParticipantName)) {
+          instanceSet.add(oldParticipantName);
+        }
+        Assert.assertEquals(oldStateMap.keySet(), instanceSet);
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testNodeSwap")
+  public void testFaultZoneSwap() throws Exception {
+    Map<String, ExternalView> record = new HashMap<>();
+    for (String db : _allDBs) {
+      record.put(db,
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db));
+    }
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    // 1. disable a whole fault zone
+    Map<String, InstanceConfig> removedInstanceConfigMap = new HashMap<>();
+    for (MockParticipantManager participant : _participants) {
+      String instanceName = participant.getInstanceName();
+      InstanceConfig instanceConfig =
+          _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
+      if (instanceConfig.getDomainAsMap().get("zone").equals("zone-0")) {
+        instanceConfig.setInstanceEnabled(false);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
+        removedInstanceConfigMap.put(instanceName, instanceConfig);
+      }
+    }
+    Assert.assertTrue(_clusterVerifier.verify(10000));
+
+    // 2. then entering maintenance mode and remove all the zone-0 nodes from topology
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, "NodeSwap", Collections.emptyMap());
+    Iterator<MockParticipantManager> iter = _participants.iterator();
+    while(iter.hasNext()) {
+      MockParticipantManager participant = iter.next();
+      String instanceName = participant.getInstanceName();
+      if (removedInstanceConfigMap.containsKey(instanceName)) {
+        participant.syncStop();
+        iter.remove();
+        Thread.sleep(1000);
+        _gSetupTool.getClusterManagementTool()
+            .dropInstance(CLUSTER_NAME, removedInstanceConfigMap.get(instanceName));
+      }
+    }
+
+    // 3. create new participants with same topology
+    Set<String> newInstanceNames = new HashSet<>();
+    for (int i = 0; i < removedInstanceConfigMap.size(); i++) {
+      String newParticipantName = "NewParticipant_" + (START_PORT + i++);
+      newInstanceNames.add(newParticipantName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
+      InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
+      String domain = String.format("zone=zone-0,instance=%s", newParticipantName);
+      newConfig.setDomain(domain);
+      _gSetupTool.getClusterManagementTool()
+          .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
+
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newParticipantName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // 4. exit maintenance mode and rebalance
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
+
+    Thread.sleep(2000);
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    for (String db : _allDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      ExternalView oldEv = record.get(db);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> stateMap = ev.getStateMap(partition);
+        Map<String, String> oldStateMap = oldEv.getStateMap(partition);
+        Assert.assertTrue(oldStateMap != null && stateMap != null);
+        Assert.assertEquals(stateMap.size(), _replica);
+        Set<String> instanceSet = new HashSet<>(stateMap.keySet());
+        instanceSet.removeAll(oldStateMap.keySet());
+        // All the different instances in the new mapping are the newly added instance
+        Assert.assertTrue(newInstanceNames.containsAll(instanceSet));
+        instanceSet = new HashSet<>(oldStateMap.keySet());
+        instanceSet.removeAll(stateMap.keySet());
+        // All the different instances in the old mapping are the removed instance
+        Assert.assertTrue(removedInstanceConfigMap.keySet().containsAll(instanceSet));
+      }
+    }
+  }
+}