You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/27 00:52:47 UTC
helix git commit: Add integration tests to test Helix's partition
migration strategy during cluster expansion and idealstate rebalance strategy
change.
Repository: helix
Updated Branches:
refs/heads/master 7753b602c -> 94ac4253b
Add integration tests to test Helix's partition migration strategy during cluster expansion and idealstate rebalance strategy change.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/94ac4253
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/94ac4253
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/94ac4253
Branch: refs/heads/master
Commit: 94ac4253bb6a35f8ad895200ba6b050a576d4198
Parents: 7753b60
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Apr 16 18:32:58 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Tue Jun 26 17:52:19 2018 -0700
----------------------------------------------------------------------
.../org/apache/helix/DummyProcessThread.java | 5 +-
.../manager/MockParticipantManager.java | 20 +-
.../PartitionMigration/TestExpandCluster.java | 124 ++++++++++
.../TestFullAutoMigration.java | 150 ++++++++++++
.../TestPartitionMigrationBase.java | 241 +++++++++++++++++++
.../rebalancer/TestMixedModeAutoRebalance.java | 13 -
.../handling/TestResourceThreadpoolSize.java | 2 +-
.../helix/mock/participant/DummyProcess.java | 14 +-
8 files changed, 540 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
index 995b123..84bcae4 100644
--- a/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
+++ b/helix-core/src/test/java/org/apache/helix/DummyProcessThread.java
@@ -19,9 +19,10 @@ package org.apache.helix;
* under the License.
*/
+import org.apache.helix.mock.participant.DummyProcess;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.DummyProcess.DummyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyMasterSlaveStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class DummyProcessThread implements Runnable {
@Override
public void run() {
try {
- DummyStateModelFactory stateModelFactory = new DummyStateModelFactory(0);
+ DummyMasterSlaveStateModelFactory stateModelFactory = new DummyMasterSlaveStateModelFactory(0);
StateMachineEngine stateMach = _manager.getStateMachineEngine();
stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index cc168e9..55316ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -33,7 +33,6 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,14 +43,23 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
protected CountDownLatch _stopCountDown = new CountDownLatch(1);
protected CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
- protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null);
- protected DummyLeaderStandbyStateModelFactory _lsModelFactory =
- new DummyLeaderStandbyStateModelFactory(10);
- protected DummyOnlineOfflineStateModelFactory _ofModelFactory =
- new DummyOnlineOfflineStateModelFactory(10);
+ protected int _transDelay = 10;
+
+ protected MockMSModelFactory _msModelFactory;
+ protected DummyLeaderStandbyStateModelFactory _lsModelFactory;
+ protected DummyOnlineOfflineStateModelFactory _ofModelFactory;
public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
+ this(zkAddr, clusterName, instanceName, 10);
+ }
+
+ public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
+ int transDelay) {
super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+ _transDelay = transDelay;
+ _msModelFactory = new MockMSModelFactory(null);
+ _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
+ _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay);
}
public void setTransition(MockTransition transition) {
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
new file mode 100644
index 0000000..799d750
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -0,0 +1,124 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestExpandCluster extends TestPartitionMigrationBase {
+
+ Map<String, IdealState> _resourceMap;
+
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _resourceMap = createTestDBs(1000000);
+ _migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager);
+ }
+
+ @Test
+ public void testClusterExpansion() throws Exception {
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ _migrationVerifier.start();
+
+ // expand cluster by adding instance one by one
+ int numNodes = _participants.size();
+ for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+ _participants.add(participant);
+ Thread.sleep(50);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verify());
+ Assert.assertFalse(_migrationVerifier.hasLessReplica());
+ Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+ _migrationVerifier.stop();
+ }
+
+
+ @Test (dependsOnMethods = {"testClusterExpansion"})
+ public void testClusterExpansionByEnableInstance() throws Exception {
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ _migrationVerifier.reset();
+ _migrationVerifier.start();
+
+ int numNodes = _participants.size();
+ // add new instances with all disabled
+ for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
+ config.setInstanceEnabled(false);
+
+ _setupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // enable new instance one by one
+ for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, storageNodeName, true);
+ Thread.sleep(100);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verify());
+ Assert.assertFalse(_migrationVerifier.hasLessReplica());
+ Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+ _migrationVerifier.stop();
+ }
+
+ @Test(dependsOnMethods = {"testClusterExpansion", "testClusterExpansionByEnableInstance"})
+ public void testClusterShrink() throws Exception {
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ _migrationVerifier.reset();
+ _migrationVerifier.start();
+
+ // remove instance one by one
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ MockParticipantManager participant = _participants.get(i);
+ participant.syncStop();
+ _setupTool.dropInstanceFromCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verify());
+ Assert.assertFalse(_migrationVerifier.hasLessReplica());
+ Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+ _migrationVerifier.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
new file mode 100644
index 0000000..e93445d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestFullAutoMigration.java
@@ -0,0 +1,150 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class TestFullAutoMigration extends TestPartitionMigrationBase {
+ ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ }
+
+ @DataProvider(name = "stateModels")
+ public static Object [][] stateModels() {
+ return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true},
+ {BuiltInStateModelDefinitions.OnlineOffline.name(), true},
+ {BuiltInStateModelDefinitions.LeaderStandby.name(), true},
+ {BuiltInStateModelDefinitions.MasterSlave.name(), false},
+ {BuiltInStateModelDefinitions.OnlineOffline.name(), false},
+ {BuiltInStateModelDefinitions.LeaderStandby.name(), false},
+ };
+ }
+
+ @Test(dataProvider = "stateModels")
+ public void testMigrateToFullAutoWhileExpandCluster(
+ String stateModel, boolean delayEnabled) throws Exception {
+ String db = "Test-DB-" + stateModel;
+ if (delayEnabled) {
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ _replica - 1, 200000, CrushRebalanceStrategy.class.getName());
+ } else {
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ _replica, 0, CrushRebalanceStrategy.class.getName());
+ }
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
+ List<String> userDefinedPartitions = new ArrayList<>();
+ for (String partition : userDefinedPreferenceLists.keySet()) {
+ List<String> preferenceList = new ArrayList<>();
+ for (int k = _replica; k > 0; k--) {
+ String instance = _participants.get(k).getInstanceName();
+ preferenceList.add(instance);
+ }
+ userDefinedPreferenceLists.put(partition, preferenceList);
+ userDefinedPartitions.add(partition);
+ }
+
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
+ _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+ // add new instance to the cluster
+ int numNodes = _participants.size();
+ for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+ _participants.add(participant);
+ Thread.sleep(50);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verify());
+
+ _migrationVerifier =
+ new MigrationStateVerifier(Collections.singletonMap(db, idealState), _manager);
+
+ _migrationVerifier.reset();
+ _migrationVerifier.start();
+
+ while (userDefinedPartitions.size() > 0) {
+ removePartitionFromUserDefinedList(db, userDefinedPartitions);
+ Thread.sleep(50);
+ }
+
+
+ Assert.assertTrue(_clusterVerifier.verify());
+ Assert.assertFalse(_migrationVerifier.hasLessReplica());
+ Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+ _migrationVerifier.stop();
+ }
+
+ private void removePartitionFromUserDefinedList(String db, List<String> userDefinedPartitions) {
+ ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, db);
+ Map<String, List<String>> lists = resourceConfig.getPreferenceLists();
+ lists.remove(userDefinedPartitions.get(0));
+ resourceConfig.setPreferenceLists(lists);
+ userDefinedPartitions.remove(0);
+ _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+ }
+
+ // create test DBs, wait it converged and return externalviews
+ protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException {
+ Map<String, IdealState> idealStateMap = new HashMap<>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica,
+ delayTime);
+ _testDBs.add(db);
+ }
+ Thread.sleep(800);
+ Assert.assertTrue(_clusterVerifier.verify());
+ for (String db : _testDBs) {
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ idealStateMap.put(db, is);
+ }
+ return idealStateMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
new file mode 100644
index 0000000..cbd1c24
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -0,0 +1,241 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.IdealStateChangeListener;
+import org.apache.helix.integration.DelayedTransitionBase;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+public class TestPartitionMigrationBase extends ZkIntegrationTestBase {
+ final int NUM_NODE = 6;
+ protected static final int START_PORT = 12918;
+ protected static final int _PARTITIONS = 50;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ protected ClusterControllerManager _controller;
+
+ protected ClusterSetup _setupTool = null;
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ int _replica = 3;
+ int _minActiveReplica = _replica - 1;
+ HelixClusterVerifier _clusterVerifier;
+ List<String> _testDBs = new ArrayList<>();
+
+ MigrationStateVerifier _migrationVerifier;
+ HelixManager _manager;
+
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+ _setupTool = new ClusterSetup(_gZkClient);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+
+ enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ _manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ }
+
+ protected MockParticipantManager createAndStartParticipant(String instancename) {
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instancename, 50);
+ participant.setTransition(new DelayedTransitionBase(50));
+ participant.syncStart();
+ return participant;
+ }
+
+ protected String[] TestStateModels = {
+ BuiltInStateModelDefinitions.MasterSlave.name(),
+ BuiltInStateModelDefinitions.OnlineOffline.name(),
+ BuiltInStateModelDefinitions.LeaderStandby.name()
+ };
+
+ protected Map<String, IdealState> createTestDBs(long delayTime) throws InterruptedException {
+ Map<String, IdealState> idealStateMap = new HashMap<>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica, _minActiveReplica,
+ delayTime);
+ _testDBs.add(db);
+ }
+ for (String db : _testDBs) {
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ idealStateMap.put(db, is);
+ }
+ return idealStateMap;
+ }
+
+ class MigrationStateVerifier implements IdealStateChangeListener, ExternalViewChangeListener {
+ static final int EXTRA_REPLICA = 10;
+
+ boolean _hasMoreReplica = false;
+ boolean _hasLessReplica = false;
+ HelixManager _manager;
+ boolean trackEnabled = false;
+ Map<String, IdealState> _resourceMap;
+
+
+ public MigrationStateVerifier(Map<String, IdealState> resourceMap, HelixManager manager) {
+ _resourceMap = resourceMap;
+ _manager = manager;
+ }
+
+ // start tracking changes
+ public void start() throws Exception {
+ trackEnabled = true;
+ _manager.addIdealStateChangeListener(this);
+ _manager.addExternalViewChangeListener(this);
+ }
+
+ // stop tracking changes
+ public void stop() {
+ trackEnabled = false;
+ PropertyKey.Builder keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+ _manager.removeListener(keyBuilder.idealStates(), this);
+ _manager.removeListener(keyBuilder.externalViews(), this);
+ }
+
+ @Override
+ public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext)
+ throws InterruptedException {
+ if (!trackEnabled) {
+ return;
+ }
+ for (IdealState is : idealStates) {
+ int replica = is.getReplicaCount(NUM_NODE);
+ for (String p : is.getPartitionSet()) {
+ Map<String, String> stateMap = is.getRecord().getMapField(p);
+ verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "IS");
+ }
+ }
+ }
+
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+ if (!trackEnabled) {
+ return;
+ }
+ for (ExternalView ev : externalViewList) {
+ IdealState is = _resourceMap.get(ev.getResourceName());
+ if (is == null) {
+ continue;
+ }
+ int replica = is.getReplicaCount(NUM_NODE);
+ for (String p : is.getPartitionSet()) {
+ Map<String, String> stateMap = ev.getStateMap(p);
+ verifyPartitionCount(is.getResourceName(), p, stateMap, replica, "EV");
+ }
+ }
+ }
+
+ private void verifyPartitionCount(String resource, String partition,
+ Map<String, String> stateMap, int replica, String warningPrefix) {
+ if (stateMap.size() < replica) {
+ System.out.println(
+ "resource " + resource + ", partition " + partition + " has " + stateMap.size()
+ + " replicas in " + warningPrefix);
+ _hasLessReplica = true;
+ }
+
+ if (stateMap.size() > replica + EXTRA_REPLICA) {
+ System.out.println(
+ "resource " + resource + ", partition " + partition + " has " + stateMap.size()
+ + " replicas in " + warningPrefix);
+ _hasMoreReplica = true;
+ }
+ }
+
+ public boolean hasMoreReplica() {
+ return _hasMoreReplica;
+ }
+
+ public boolean hasLessReplica() {
+ return _hasLessReplica;
+ }
+
+ public void reset() {
+ _hasMoreReplica = false;
+ _hasLessReplica = false;
+ }
+ }
+
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ /**
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ }
+ _manager.disconnect();
+ _setupTool.deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index dcbd7e1..77340af 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -21,40 +21,27 @@ package org.apache.helix.integration.rebalancer;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.integration.common.ZkIntegrationTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.mock.participant.DummyProcess;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockMSStateModel;
-import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
-import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.testng.Assert;
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 424c71f..3b8e8f0 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -226,7 +226,7 @@ public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
}
public static class TestMasterSlaveStateModelFactory
- extends DummyProcess.DummyStateModelFactory {
+ extends DummyProcess.DummyMasterSlaveStateModelFactory {
int _startThreadPoolSize;
Map<String, ExecutorService> _threadPoolExecutorMap;
http://git-wip-us.apache.org/repos/asf/helix/blob/94ac4253/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
index b3eea72..9c87dda 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/DummyProcess.java
@@ -54,7 +54,7 @@ public class DummyProcess {
private final String _zkConnectString;
private final String _clusterName;
private final String _instanceName;
- private DummyStateModelFactory stateModelFactory;
+ private DummyMasterSlaveStateModelFactory stateModelFactory;
// private StateMachineEngine genericStateMachineHandler;
private int _transDelayInMs = 0;
@@ -91,7 +91,7 @@ public class DummyProcess {
throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
}
- stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
+ stateModelFactory = new DummyMasterSlaveStateModelFactory(_transDelayInMs);
DummyLeaderStandbyStateModelFactory stateModelFactory1 =
new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
DummyOnlineOfflineStateModelFactory stateModelFactory2 =
@@ -108,16 +108,16 @@ public class DummyProcess {
return manager;
}
- public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel> {
+ public static class DummyMasterSlaveStateModelFactory extends StateModelFactory<DummyMasterSlaveStateModel> {
int _delay;
- public DummyStateModelFactory(int delay) {
+ public DummyMasterSlaveStateModelFactory(int delay) {
_delay = delay;
}
@Override
- public DummyStateModel createNewStateModel(String resourceName, String stateUnitKey) {
- DummyStateModel model = new DummyStateModel();
+ public DummyMasterSlaveStateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ DummyMasterSlaveStateModel model = new DummyMasterSlaveStateModel();
model.setDelay(_delay);
return model;
}
@@ -155,7 +155,7 @@ public class DummyProcess {
}
}
- public static class DummyStateModel extends StateModel {
+ public static class DummyMasterSlaveStateModel extends StateModel {
int _transDelay = 0;
public void setDelay(int delay) {