You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/27 00:52:47 UTC

helix git commit: Add integration tests to test Helix's partition migration strategy during cluster expansion and idealstate rebalance strategy change.

Repository: helix
Updated Branches:
  refs/heads/master 7753b602c -> 94ac4253b


Add integration tests to test Helix's partition migration strategy during cluster expansion and idealstate rebalance strategy change.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/94ac4253
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/94ac4253
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/94ac4253

Branch: refs/heads/master
Commit: 94ac4253bb6a35f8ad895200ba6b050a576d4198
Parents: 7753b60
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Apr 16 18:32:58 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Tue Jun 26 17:52:19 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/DummyProcessThread.java    |   5 +-
 .../manager/MockParticipantManager.java         |  20 +-
 .../PartitionMigration/TestExpandCluster.java   | 124 ++++++++++
 .../TestFullAutoMigration.java                  | 150 ++++++++++++
 .../TestPartitionMigrationBase.java             | 241 +++++++++++++++++++
 .../rebalancer/TestMixedModeAutoRebalance.java  |  13 -
 .../handling/TestResourceThreadpoolSize.java    |   2 +-
 .../helix/mock/participant/DummyProcess.java    |  14 +-
 8 files changed, 540 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index 995b123..84bcae4 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -19,9 +19,10 @@ package org.apache.helix;
  * under the License.
  */
 
+import org.apache.helix.mock.participant.DummyProcess;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyMasterSlaveStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class DummyProcessThread implements Runnable {
   @Override
   public void run() {
     try {
-      DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0);
+      DummyMasterSlaveStateModelFactory stateModelFactory = new DummyMasterSlaveStateModelFactory(0);
       StateMachineEngine stateMach = _manager.getStateMachineEngine();
       stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index cc168e9..55316ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -33,7 +33,6 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +43,23 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   protected CountDownLatch _stopCountDown = new CountDownLatch(1);
   protected CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
 
-  protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
-  protected DummyLeaderStandbyStateModelFactory _lsModelFactory =
-      new DummyLeaderStandbyStateModelFactory(10);
-  protected DummyOnlineOfflineStateModelFactory _ofModelFactory =
-      new DummyOnlineOfflineStateModelFactory(10);
+  protected int _transDelay = 10;
+
+  protected MockMSModelFactory _msModelFactory;
+  protected DummyLeaderStandbyStateModelFactory _lsModelFactory;
+  protected DummyOnlineOfflineStateModelFactory _ofModelFactory;
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
+    this(zkAddr, clusterName, instanceName, 10);
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
+      int transDelay) {
     super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+    _transDelay = transDelay;
+    _msModelFactory = new MockMSModelFactory(null);
+    _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
+    _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay);
   }
 
   public void setTransition(MockTransition transition) {

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
new file mode 100644
index 0000000..799d750
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -0,0 +1,124 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestExpandCluster extends TestPartitionMigrationBase {
+
+  Map<String, IdealState> _resourceMap;
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _resourceMap = createTestDBs(1000000);
+    _migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager);
+  }
+
+  @Test
+  public void testClusterExpansion() throws Exception {
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    _migrationVerifier.start();
+
+    // expand cluster by adding instance one by one
+    int numNodes = _participants.size();
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+      _participants.add(participant);
+      Thread.sleep(50);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+
+
+  @Test (dependsOnMethods = {"testClusterExpansion"})
+  public void testClusterExpansionByEnableInstance() throws Exception {
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    _migrationVerifier.reset();
+    _migrationVerifier.start();
+
+    int numNodes = _participants.size();
+    // add new instances with all disabled
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
+      config.setInstanceEnabled(false);
+
+      _setupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
+
+      // start dummy participants
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // enable new instance one by one
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, true);
+      Thread.sleep(100);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+
+  @Test(dependsOnMethods = {"testClusterExpansion", "testClusterExpansionByEnableInstance"})
+  public void testClusterShrink() throws Exception {
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    _migrationVerifier.reset();
+    _migrationVerifier.start();
+
+    // remove instance one by one
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = _participants.get(i);
+      participant.syncStop();
+      _setupTool.dropInstanceFromCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
new file mode 100644
index 0000000..e93445d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
@@ -0,0 +1,150 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class TestFullAutoMigration extends TestPartitionMigrationBase {
+  ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @DataProvider(name = "stateModels")
+  public static Object [][] stateModels() {
+    return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true},
+        {BuiltInStateModelDefinitions.OnlineOffline.name(), true},
+        {BuiltInStateModelDefinitions.LeaderStandby.name(), true},
+        {BuiltInStateModelDefinitions.MasterSlave.name(), false},
+        {BuiltInStateModelDefinitions.OnlineOffline.name(), false},
+        {BuiltInStateModelDefinitions.LeaderStandby.name(), false},
+    };
+  }
+
+  @Test(dataProvider = "stateModels")
+  public void testMigrateToFullAutoWhileExpandCluster(
+      String stateModel, boolean delayEnabled) throws Exception {
+    String db = "Test-DB-" + stateModel;
+    if (delayEnabled) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica - 1, 200000, CrushRebalanceStrategy.class.getName());
+    } else {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica, 0, CrushRebalanceStrategy.class.getName());
+    }
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
+    List<String> userDefinedPartitions = new ArrayList<>();
+    for (String partition : userDefinedPreferenceLists.keySet()) {
+      List<String> preferenceList = new ArrayList<>();
+      for (int k = _replica; k > 0; k--) {
+        String instance = _participants.get(k).getInstanceName();
+        preferenceList.add(instance);
+      }
+      userDefinedPreferenceLists.put(partition, preferenceList);
+      userDefinedPartitions.add(partition);
+    }
+
+    ResourceConfig resourceConfig =
+        new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
+    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+    // add new instance to the cluster
+    int numNodes = _participants.size();
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+      _participants.add(participant);
+      Thread.sleep(50);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    _migrationVerifier =
+        new MigrationStateVerifier(Collections.singletonMap(db, idealState), _manager);
+
+    _migrationVerifier.reset();
+    _migrationVerifier.start();
+
+    while (userDefinedPartitions.size() > 0) {
+      removePartitionFromUserDefinedList(db, userDefinedPartitions);
+      Thread.sleep(50);
+    }
+
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+
+  private void removePartitionFromUserDefinedList(String db, List<String> userDefinedPartitions) {
+    ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, db);
+    Map<String, List<String>> lists = resourceConfig.getPreferenceLists();
+    lists.remove(userDefinedPartitions.get(0));
+    resourceConfig.setPreferenceLists(lists);
+    userDefinedPartitions.remove(0);
+    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+  }
+
+  // create test DBs, wait it converged and return externalviews
+  protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException {
+    Map<String, IdealState> idealStateMap = new HashMap<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica,
+          delayTime);
+      _testDBs.add(db);
+    }
+    Thread.sleep(800);
+    Assert.assertTrue(_clusterVerifier.verify());
+    for (String db : _testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      idealStateMap.put(db, is);
+    }
+    return idealStateMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
new file mode 100644
index 0000000..cbd1c24
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -0,0 +1,241 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.IdealStateChangeListener;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 50;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  int _replica = 3;
+  int _minActiveReplica = _replica - 1;
+  HelixClusterVerifier _clusterVerifier;
+  List<String> _testDBs = new ArrayList<>();
+
+  MigrationStateVerifier _migrationVerifier;
+  HelixManager _manager;
+
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+
+    enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
+
+    _manager =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+  }
+
+  protected MockParticipantManager createAndStartParticipant(String instancename) {
+    _setupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
+
+    // start dummy participants
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instancename, 50);
+    participant.setTransition(new DelayedTransitionBase(50));
+    participant.syncStart();
+    return participant;
+  }
+
+  protected String[] TestStateModels = {
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException {
+    Map<String, IdealState> idealStateMap = new HashMap<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica,
+          delayTime);
+      _testDBs.add(db);
+    }
+    for (String db : _testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      idealStateMap.put(db, is);
+    }
+    return idealStateMap;
+  }
+
+  class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener {
+    static final int EXTRA_REPLICA = 10;
+
+    boolean _hasMoreReplica = false;
+    boolean _hasLessReplica = false;
+    HelixManager _manager;
+    boolean trackEnabled = false;
+    Map<String, IdealState> _resourceMap;
+
+
+    public MigrationStateVerifier(Map<String, IdealState> resourceMap, HelixManager manager) {
+      _resourceMap = resourceMap;
+      _manager = manager;
+    }
+
+    // start tracking changes
+    public void start() throws Exception {
+      trackEnabled = true;
+      _manager.addIdealStateChangeListener(this);
+      _manager.addExternalViewChangeListener(this);
+    }
+
+    // stop tracking changes
+    public void stop() {
+      trackEnabled = false;
+      PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+      _manager.removeListener(keyBuilder.idealStates(), this);
+      _manager.removeListener(keyBuilder.externalViews(), this);
+    }
+
+    @Override
+    public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext)
+        throws InterruptedException {
+      if (!trackEnabled) {
+        return;
+      }
+      for (IdealState is : idealStates) {
+        int replica = is.getReplicaCount(NUM_NODE);
+        for (String p : is.getPartitionSet()) {
+          Map<String, String> stateMap = is.getRecord().getMapField(p);
+          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "IS");
+        }
+      }
+    }
+
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+      if (!trackEnabled) {
+        return;
+      }
+      for (ExternalView ev : externalViewList) {
+        IdealState is = _resourceMap.get(ev.getResourceName());
+        if (is == null) {
+          continue;
+        }
+        int replica = is.getReplicaCount(NUM_NODE);
+        for (String p : is.getPartitionSet()) {
+          Map<String, String> stateMap = ev.getStateMap(p);
+          verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "EV");
+        }
+      }
+    }
+
+    private void verifyPartitionCount(String resource, String partition,
+        Map<String, String> stateMap, int replica, String warningPrefix) {
+      if (stateMap.size() < replica) {
+        System.out.println(
+            "resource " + resource + ", partition " + partition + " has " + stateMap.size()
+                + " replicas in " + warningPrefix);
+        _hasLessReplica = true;
+      }
+
+      if (stateMap.size() > replica + EXTRA_REPLICA) {
+        System.out.println(
+            "resource " + resource + ", partition " + partition + " has " + stateMap.size()
+                + " replicas in " + warningPrefix);
+        _hasMoreReplica = true;
+      }
+    }
+
+    public boolean hasMoreReplica() {
+      return _hasMoreReplica;
+    }
+
+    public boolean hasLessReplica() {
+      return _hasLessReplica;
+    }
+
+    public void reset() {
+      _hasMoreReplica = false;
+      _hasLessReplica = false;
+    }
+  }
+
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _manager.disconnect();
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index dcbd7e1..77340af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -21,40 +21,27 @@ package org.apache.helix.integration.rebalancer;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.mock.participant.DummyProcess;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
-import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.testng.Assert;

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 424c71f..3b8e8f0 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -226,7 +226,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
   }
 
   public static class TestMasterSlaveStateModelFactory
-      extends DummyProcess.DummyStateModelFactory {
+      extends DummyProcess.DummyMasterSlaveStateModelFactory {
     int _startThreadPoolSize;
     Map<String, ExecutorService> _threadPoolExecutorMap;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index b3eea72..9c87dda 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -54,7 +54,7 @@ public class DummyProcess {
   private final String _zkConnectString;
   private final String _clusterName;
   private final String _instanceName;
-  private DummyStateModelFactory stateModelFactory;
+  private DummyMasterSlaveStateModelFactory stateModelFactory;
   // private StateMachineEngine genericStateMachineHandler;
 
   private int _transDelayInMs = 0;
@@ -91,7 +91,7 @@ public class DummyProcess {
       throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
     }
 
-    stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
+    stateModelFactory = new DummyMasterSlaveStateModelFactory(_transDelayInMs);
     DummyLeaderStandbyStateModelFactory stateModelFactory1 =
         new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
     DummyOnlineOfflineStateModelFactory stateModelFactory2 =
@@ -108,16 +108,16 @@ public class DummyProcess {
     return manager;
   }
 
-  public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel> {
+  public static class DummyMasterSlaveStateModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
     int _delay;
 
-    public DummyStateModelFactory(int delay) {
+    public DummyMasterSlaveStateModelFactory(int delay) {
       _delay = delay;
     }
 
     @Override
-    public DummyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
-      DummyStateModel model = new DummyStateModel();
+    public DummyMasterSlaveStateModel createNewStateModel(String resourceName, String stateUnitKey) {
+      DummyMasterSlaveStateModel model = new DummyMasterSlaveStateModel();
       model.setDelay(_delay);
       return model;
     }
@@ -155,7 +155,7 @@ public class DummyProcess {
     }
   }
 
-  public static class DummyStateModel extends StateModel {
+  public static class DummyMasterSlaveStateModel extends StateModel {
     int _transDelay = 0;
 
     public void setDelay(int delay) {