You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/08/23 17:34:26 UTC

[helix] branch ApplicationClusterManager updated: Add support for node evacuation. (#2604)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
     new 6cf94f539 Add support for node evacuation. (#2604)
6cf94f539 is described below

commit 6cf94f5392a9adc96264b4e827cf8aea3a61eea9
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Aug 23 10:34:21 2023 -0700

    Add support for node evacuation. (#2604)
---
 .../rebalancer/util/DelayedRebalanceUtil.java      |  27 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   2 -
 .../org/apache/helix/model/InstanceConfig.java     |  11 +-
 .../rebalancer/TestInstanceOperation.java          | 421 +++++++++++++++++++++
 4 files changed, 444 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 92556bb40..e7ff99765 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -82,37 +83,37 @@ public class DelayedRebalanceUtil {
   }
 
   /**
-   * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
-   * delay rebalance configurations.
+   * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
+   * while considering cluster delay rebalance configurations.
    */
   public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
       Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
       Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
     if (!isDelayRebalanceEnabled(clusterConfig)) {
-      return new HashSet<>(liveEnabledNodes);
+      return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
     }
     return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
         instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
   }
 
   /**
-   * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
-   * and the resource delay rebalance configurations.
+   * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
+   * while considering cluster delay rebalance configurations.
    */
   public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
       Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
       Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
     if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
-      return new HashSet<>(liveEnabledNodes);
+      return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
     }
     return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
         instanceConfigMap, delay, clusterConfig);
   }
 
   private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
-      Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
-      Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
-    Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
+      Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap,
+      long delay, ClusterConfig clusterConfig) {
+    Set<String> activeNodes = filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
     Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
     offlineOrDisabledInstances.removeAll(liveEnabledNodes);
     long currentTime = System.currentTimeMillis();
@@ -128,6 +129,14 @@ public class DelayedRebalanceUtil {
     return activeNodes;
   }
 
+  private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
+      Set<String> nodes) {
+    return  nodes.stream()
+        .filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
+            InstanceConstants.InstanceOperation.EVACUATE.name()))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Return the time when an offline or disabled instance should be treated as inactive. Return -1
    * if it is inactive now or forced to be rebalanced by an on-demand rebalance.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 917cb25de..15b38fcbc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -396,8 +396,6 @@ public class ZKHelixAdmin implements HelixAdmin {
         }
 
         InstanceConfig config = new InstanceConfig(currentData);
-        // TODO: add sanity check in  config.setInstanceOperation and throw exception when needed.
-        // TODO: Also instance enabled in instance config
         config.setInstanceOperation(instanceOperation);
         return config.getRecord();
       }
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index fdcb74517..193019d0e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -345,11 +345,11 @@ public class InstanceConfig extends HelixProperty {
 
   public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
     if (operation != InstanceConstants.InstanceOperation.DISABLE
-        && operation != InstanceConstants.InstanceOperation.ENABLE ){
-       if( !getInstanceEnabled()) {
-         throw new HelixException(
-             "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
-       }
+        && operation != InstanceConstants.InstanceOperation.ENABLE) {
+      if (!getInstanceEnabled()) {
+        throw new HelixException(
+            "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
+      }
     } else {
       setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE);
     }
@@ -381,7 +381,6 @@ public class InstanceConfig extends HelixProperty {
 
   /**
    * Check if this instance is enabled for a given partition
-   *
    * @param partition the partition name to check
    * @return true if the instance is enabled for the partition, false otherwise
    */
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
new file mode 100644
index 000000000..2276ab1c3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -0,0 +1,421 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceOperation extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private int REPLICA = 3;
+  protected ClusterControllerManager _controller;
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  List<String> _participantNames = new ArrayList<>();
+  private Set<String> _allDBs = new HashSet<>();
+  private ZkHelixClusterVerifier _clusterVerifier;
+  private ConfigAccessor _configAccessor;
+  private long _stateModelDelay = 30L;
+  protected AssignmentMetadataStore _assignmentMetadataStore;
+  HelixDataAccessor _dataAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addParticipant(participantName);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setDeactivatedNodeAwareness(true)
+        .setResources(_allDBs)
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.stateTransitionCancelEnabled(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    createTestDBs(200);
+
+    setUpWagedBaseline();
+  }
+
+  @Test
+  public void testEvacuate() throws Exception {
+    // EV should contain all participants, check resources one by one
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+    }
+
+    // evacuated instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // New ev should contain all instances but the evacuated one
+    assignment = getEV();
+    List<String> currentActiveInstances =
+        _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList());
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+      Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
+      Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+      Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuate")
+  public void testRevertEvacuation() throws Exception {
+
+    // revert an evacuate instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // EV should contain all participants, check resources one by one
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+      validateAssignmentInEv(assignment.get(resource));
+    }
+  }
+
+  @Test(dependsOnMethods = "testRevertEvacuation")
+  public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
+    // add a resource where downward state transition is slow
+    createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
+        REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
+    _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
+    // add a resource where downward state transition is slow
+    createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
+        PARTITIONS, REPLICA, REPLICA - 1);
+    _allDBs.add("TEST_DB4_DELAYED_WAGED");
+    // wait for assignment to finish
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // set bootstrap ST delay to a large number
+    _stateModelDelay = -300000L;
+    // evacuate an instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+    // Messages should be pending at all instances besides the evacuate one
+    for (String participant : _participantNames) {
+      if (participant.equals(instanceToEvacuate)) {
+        continue;
+      }
+      TestHelper.verify(
+          () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
+    }
+
+    // sleep a bit so ST messages can start executing
+    Thread.sleep(Math.abs(_stateModelDelay / 100));
+    // before we cancel, check current EV
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      // check every replica has >= 3 partitions and a top state partition
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    // cancel the evacuation by setting instance operation back to `ENABLE`
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+
+    assignment = getEV();
+    for (String resource : _allDBs) {
+      // check every replica has >= 3 active replicas, even before cluster converge
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST.
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // EV should contain all participants, check resources one by one
+    assignment = getEV();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+      // check every replica has >= 3 active replicas again
+      validateAssignmentInEv(assignment.get(resource));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish")
+  public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
+
+    // set DROP ST delay to a large number
+    _stateModelDelay = 300000L;
+
+    // evacuate an instance
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+    // message should be pending at the to evacuate participant
+    TestHelper.verify(
+        () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
+
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+    // check every replica has >= 3 active replicas, even before cluster converge
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // EV should contain all participants, check resources one by one
+    assignment = getEV();
+    for (String resource : _allDBs) {
+      Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+      // check every replica has >= 3 active replicas
+      validateAssignmentInEv(assignment.get(resource));
+    }
+  }
+
+  @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish")
+  public void testMarkEvacuationAfterEMM() throws Exception {
+    _stateModelDelay = 1000L;
+    Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME));
+    _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null,
+        null);
+    addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Map<String, ExternalView> assignment = getEV();
+    for (String resource : _allDBs) {
+      Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE)));
+    }
+
+    // set evacuate operation
+    String instanceToEvacuate = _participants.get(0).getInstanceName();
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+    // there should be no evacuation happening
+    for (String resource : _allDBs) {
+      Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    // exit MM
+    _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null,
+        null);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    assignment = getEV();
+    List<String> currentActiveInstances =
+        _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList());
+    for (String resource : _allDBs) {
+      validateAssignmentInEv(assignment.get(resource));
+      Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
+      Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+      Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+    }
+
+  }
+
+  private void addParticipant(String participantName) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);
+
+    // start dummy participants
+    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
+    StateMachineEngine stateMachine = participant.getStateMachineEngine();
+    // Using a delayed state model
+    StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
+    stateMachine.registerStateModelFactory("MasterSlave", delayFactory);
+
+    participant.syncStart();
+    _participants.add(participant);
+    _participantNames.add(participantName);
+  }
+
+   private void createTestDBs(long delayTime) throws InterruptedException {
+    createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED",
+        BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200,
+        CrushEdRebalanceStrategy.class.getName());
+    _allDBs.add("TEST_DB1_CRUSHED");
+    createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(),
+        PARTITIONS, REPLICA, REPLICA - 1);
+    _allDBs.add("TEST_DB2_WAGED");
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  private Map<String, ExternalView> getEV() {
+    Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+    for (String db : _allDBs) {
+      ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      externalViews.put(db, ev);
+    }
+    return externalViews;
+  }
+
+  private Set<String> getParticipantsInEv(ExternalView ev) {
+    Set<String> assignedParticipants = new HashSet<>();
+    ev.getPartitionSet().forEach(partition -> assignedParticipants.addAll(ev.getStateMap(partition).keySet()));
+    return assignedParticipants;
+  }
+
+  // verify that each partition has >=REPLICA (3 in this case) replicas
+  private void validateAssignmentInEv(ExternalView ev) {
+    Set<String> partitionSet = ev.getPartitionSet();
+    for (String partition : partitionSet) {
+      AtomicInteger activeReplicaCount = new AtomicInteger();
+      ev.getStateMap(partition)
+          .values()
+          .stream()
+          .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
+          .forEach(v -> activeReplicaCount.getAndIncrement());
+      Assert.assertTrue(activeReplicaCount.get() >=REPLICA);
+
+    }
+  }
+
+  private void setUpWagedBaseline() {
+    _assignmentMetadataStore = new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) {
+      public Map<String, ResourceAssignment> getBaseline() {
+        // Ensure this metadata store always read from the ZK without using cache.
+        super.reset();
+        return super.getBaseline();
+      }
+
+      public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() {
+        // Ensure this metadata store always read from the ZK without using cache.
+        super.reset();
+        return super.getBestPossibleAssignment();
+      }
+    };
+
+    // Set test instance capacity and partition weights
+    ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
+    String testCapacityKey = "TestCapacityKey";
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+    clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
+    clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
+    _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
+  }
+
+  // A state transition model where either downward ST are slow (_stateModelDelay >0) or upward ST are slow (_stateModelDelay <0)
+  public class StDelayMSStateModelFactory extends StateModelFactory<StDelayMSStateModel> {
+
+    @Override
+    public StDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) {
+      StDelayMSStateModel model = new StDelayMSStateModel();
+      return model;
+    }
+  }
+
+  @StateModelInfo(initialState = "OFFLINE", states = {"MASTER", "SLAVE", "ERROR"})
+  public class StDelayMSStateModel extends StateModel {
+
+    public StDelayMSStateModel() {
+      _cancelled = false;
+    }
+
+    private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{
+      while(sleepTime >0 && !isCancelled()) {
+        Thread.sleep(5000);
+        sleepTime =- 5000;
+      }
+      if (isCancelled()) {
+        _cancelled = false;
+        throw new HelixRollbackException("EX");
+      }
+    }
+
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException {
+      if (_stateModelDelay < 0) {
+        sleepWhileNotCanceled(Math.abs(_stateModelDelay));
+      }
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException {
+      if (_stateModelDelay < 0) {
+        sleepWhileNotCanceled(Math.abs(_stateModelDelay));
+      }
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException {
+      if (_stateModelDelay > 0) {
+        sleepWhileNotCanceled(_stateModelDelay);
+      }
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException {
+      if (_stateModelDelay > 0) {
+        sleepWhileNotCanceled(_stateModelDelay);
+      }
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException {
+      if (_stateModelDelay > 0) {
+        sleepWhileNotCanceled(_stateModelDelay);
+      }
+    }
+  }
+}