You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/08/23 17:34:26 UTC
[helix] branch ApplicationClusterManager updated: Add support for node evacuation. (#2604)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
new 6cf94f539 Add support for node evacuation. (#2604)
6cf94f539 is described below
commit 6cf94f5392a9adc96264b4e827cf8aea3a61eea9
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Aug 23 10:34:21 2023 -0700
Add support for node evacuation. (#2604)
---
.../rebalancer/util/DelayedRebalanceUtil.java | 27 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 2 -
.../org/apache/helix/model/InstanceConfig.java | 11 +-
.../rebalancer/TestInstanceOperation.java | 421 +++++++++++++++++++++
4 files changed, 444 insertions(+), 17 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 92556bb40..e7ff99765 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixManager;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -82,37 +83,37 @@ public class DelayedRebalanceUtil {
}
/**
- * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
- * delay rebalance configurations.
+ * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
+ * while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(clusterConfig)) {
- return new HashSet<>(liveEnabledNodes);
+ return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
}
/**
- * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
- * and the resource delay rebalance configurations.
+ * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes)
+ * while considering cluster delay rebalance configurations.
*/
public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
- return new HashSet<>(liveEnabledNodes);
+ return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
}
return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
instanceConfigMap, delay, clusterConfig);
}
private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
- Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
- Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
- Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
+ Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap,
+ long delay, ClusterConfig clusterConfig) {
+ Set<String> activeNodes = filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
long currentTime = System.currentTimeMillis();
@@ -128,6 +129,14 @@ public class DelayedRebalanceUtil {
return activeNodes;
}
+ private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
+ Set<String> nodes) {
+ return nodes.stream()
+ .filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
+ InstanceConstants.InstanceOperation.EVACUATE.name()))
+ .collect(Collectors.toSet());
+ }
+
/**
* Return the time when an offline or disabled instance should be treated as inactive. Return -1
* if it is inactive now or forced to be rebalanced by an on-demand rebalance.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 917cb25de..15b38fcbc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -396,8 +396,6 @@ public class ZKHelixAdmin implements HelixAdmin {
}
InstanceConfig config = new InstanceConfig(currentData);
- // TODO: add sanity check in config.setInstanceOperation and throw exception when needed.
- // TODO: Also instance enabled in instance config
config.setInstanceOperation(instanceOperation);
return config.getRecord();
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index fdcb74517..193019d0e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -345,11 +345,11 @@ public class InstanceConfig extends HelixProperty {
public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
if (operation != InstanceConstants.InstanceOperation.DISABLE
- && operation != InstanceConstants.InstanceOperation.ENABLE ){
- if( !getInstanceEnabled()) {
- throw new HelixException(
- "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
- }
+ && operation != InstanceConstants.InstanceOperation.ENABLE) {
+ if (!getInstanceEnabled()) {
+ throw new HelixException(
+ "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
+ }
} else {
setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE);
}
@@ -381,7 +381,6 @@ public class InstanceConfig extends HelixProperty {
/**
* Check if this instance is enabled for a given partition
- *
* @param partition the partition name to check
* @return true if the instance is enabled for the partition, false otherwise
*/
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
new file mode 100644
index 000000000..2276ab1c3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -0,0 +1,421 @@
+package org.apache.helix.integration.rebalancer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRollbackException;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceOperation extends ZkTestBase {
+ protected final int NUM_NODE = 6;
+ protected static final int START_PORT = 12918;
+ protected static final int PARTITIONS = 20;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ private int REPLICA = 3;
+ protected ClusterControllerManager _controller;
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ List<String> _participantNames = new ArrayList<>();
+ private Set<String> _allDBs = new HashSet<>();
+ private ZkHelixClusterVerifier _clusterVerifier;
+ private ConfigAccessor _configAccessor;
+ private long _stateModelDelay = 30L;
+ protected AssignmentMetadataStore _assignmentMetadataStore;
+ HelixDataAccessor _dataAccessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ addParticipant(participantName);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+ _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setDeactivatedNodeAwareness(true)
+ .setResources(_allDBs)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.stateTransitionCancelEnabled(true);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ createTestDBs(200);
+
+ setUpWagedBaseline();
+ }
+
+ @Test
+ public void testEvacuate() throws Exception {
+ // EV should contain all participants, check resources one by one
+ Map<String, ExternalView> assignment = getEV();
+ for (String resource : _allDBs) {
+ Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+ }
+
+ // evacuated instance
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // New ev should contain all instances but the evacuated one
+ assignment = getEV();
+ List<String> currentActiveInstances =
+ _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList());
+ for (String resource : _allDBs) {
+ validateAssignmentInEv(assignment.get(resource));
+ Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
+ Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+ Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+ }
+ }
+
+ @Test(dependsOnMethods = "testEvacuate")
+ public void testRevertEvacuation() throws Exception {
+
+ // revert an evacuate instance
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // EV should contain all participants, check resources one by one
+ Map<String, ExternalView> assignment = getEV();
+ for (String resource : _allDBs) {
+ Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+ validateAssignmentInEv(assignment.get(resource));
+ }
+ }
+
+ @Test(dependsOnMethods = "testRevertEvacuation")
+ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
+ // add a resource where downward state transition is slow
+ createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
+ REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
+ _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
+ // add a resource where downward state transition is slow
+ createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
+ PARTITIONS, REPLICA, REPLICA - 1);
+ _allDBs.add("TEST_DB4_DELAYED_WAGED");
+ // wait for assignment to finish
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // set bootstrap ST delay to a large number
+ _stateModelDelay = -300000L;
+ // evacuate an instance
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+ // Messages should be pending at all instances besides the evacuate one
+ for (String participant : _participantNames) {
+ if (participant.equals(instanceToEvacuate)) {
+ continue;
+ }
+ TestHelper.verify(
+ () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
+ }
+
+ // sleep a bit so ST messages can start executing
+ Thread.sleep(Math.abs(_stateModelDelay / 100));
+ // before we cancel, check current EV
+ Map<String, ExternalView> assignment = getEV();
+ for (String resource : _allDBs) {
+ // check every replica has >= 3 partitions and a top state partition
+ validateAssignmentInEv(assignment.get(resource));
+ }
+
+ // cancel the evacuation by setting instance operation back to `ENABLE`
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+
+ assignment = getEV();
+ for (String resource : _allDBs) {
+ // check every replica has >= 3 active replicas, even before cluster converge
+ validateAssignmentInEv(assignment.get(resource));
+ }
+
+ // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // EV should contain all participants, check resources one by one
+ assignment = getEV();
+ for (String resource : _allDBs) {
+ Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+ // check every replica has >= 3 active replicas again
+ validateAssignmentInEv(assignment.get(resource));
+ }
+ }
+
+ @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish")
+ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
+
+ // set DROP ST delay to a large number
+ _stateModelDelay = 300000L;
+
+ // evacuate an instance
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+ // message should be pending at the to evacuate participant
+ TestHelper.verify(
+ () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
+
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
+ // check every replica has >= 3 active replicas, even before cluster converge
+ Map<String, ExternalView> assignment = getEV();
+ for (String resource : _allDBs) {
+ validateAssignmentInEv(assignment.get(resource));
+ }
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // EV should contain all participants, check resources one by one
+ assignment = getEV();
+ for (String resource : _allDBs) {
+ Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
+ // check every replica has >= 3 active replicas
+ validateAssignmentInEv(assignment.get(resource));
+ }
+ }
+
+ @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish")
+ public void testMarkEvacuationAfterEMM() throws Exception {
+ _stateModelDelay = 1000L;
+ Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME));
+ _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null,
+ null);
+ addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE));
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Map<String, ExternalView> assignment = getEV();
+ for (String resource : _allDBs) {
+ Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE)));
+ }
+
+ // set evacuate operation
+ String instanceToEvacuate = _participants.get(0).getInstanceName();
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);
+
+ // there should be no evacuation happening
+ for (String resource : _allDBs) {
+ Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate));
+ }
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // exit MM
+ _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null,
+ null);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ assignment = getEV();
+ List<String> currentActiveInstances =
+ _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList());
+ for (String resource : _allDBs) {
+ validateAssignmentInEv(assignment.get(resource));
+ Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
+ Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
+ Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
+ }
+
+ }
+
+ private void addParticipant(String participantName) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);
+
+ // start dummy participants
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
+ StateMachineEngine stateMachine = participant.getStateMachineEngine();
+ // Using a delayed state model
+ StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
+ stateMachine.registerStateModelFactory("MasterSlave", delayFactory);
+
+ participant.syncStart();
+ _participants.add(participant);
+ _participantNames.add(participantName);
+ }
+
+ private void createTestDBs(long delayTime) throws InterruptedException {
+ createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED",
+ BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200,
+ CrushEdRebalanceStrategy.class.getName());
+ _allDBs.add("TEST_DB1_CRUSHED");
+ createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(),
+ PARTITIONS, REPLICA, REPLICA - 1);
+ _allDBs.add("TEST_DB2_WAGED");
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ private Map<String, ExternalView> getEV() {
+ Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+ for (String db : _allDBs) {
+ ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViews.put(db, ev);
+ }
+ return externalViews;
+ }
+
+ private Set<String> getParticipantsInEv(ExternalView ev) {
+ Set<String> assignedParticipants = new HashSet<>();
+ ev.getPartitionSet().forEach(partition -> assignedParticipants.addAll(ev.getStateMap(partition).keySet()));
+ return assignedParticipants;
+ }
+
+ // verify that each partition has >=REPLICA (3 in this case) replicas
+ private void validateAssignmentInEv(ExternalView ev) {
+ Set<String> partitionSet = ev.getPartitionSet();
+ for (String partition : partitionSet) {
+ AtomicInteger activeReplicaCount = new AtomicInteger();
+ ev.getStateMap(partition)
+ .values()
+ .stream()
+ .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
+ .forEach(v -> activeReplicaCount.getAndIncrement());
+ Assert.assertTrue(activeReplicaCount.get() >=REPLICA);
+
+ }
+ }
+
+ private void setUpWagedBaseline() {
+ _assignmentMetadataStore = new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) {
+ public Map<String, ResourceAssignment> getBaseline() {
+ // Ensure this metadata store always read from the ZK without using cache.
+ super.reset();
+ return super.getBaseline();
+ }
+
+ public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() {
+ // Ensure this metadata store always read from the ZK without using cache.
+ super.reset();
+ return super.getBestPossibleAssignment();
+ }
+ };
+
+ // Set test instance capacity and partition weights
+ ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
+ String testCapacityKey = "TestCapacityKey";
+ clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
+ clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
+ clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
+ _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig);
+ }
+
+ // A state transition model where either downward ST are slow (_stateModelDelay >0) or upward ST are slow (_stateModelDelay <0)
+ public class StDelayMSStateModelFactory extends StateModelFactory<StDelayMSStateModel> {
+
+ @Override
+ public StDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) {
+ StDelayMSStateModel model = new StDelayMSStateModel();
+ return model;
+ }
+ }
+
+ @StateModelInfo(initialState = "OFFLINE", states = {"MASTER", "SLAVE", "ERROR"})
+ public class StDelayMSStateModel extends StateModel {
+
+ public StDelayMSStateModel() {
+ _cancelled = false;
+ }
+
+ private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{
+ while(sleepTime >0 && !isCancelled()) {
+ Thread.sleep(5000);
+ sleepTime =- 5000;
+ }
+ if (isCancelled()) {
+ _cancelled = false;
+ throw new HelixRollbackException("EX");
+ }
+ }
+
+ @Transition(to = "SLAVE", from = "OFFLINE")
+ public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException {
+ if (_stateModelDelay < 0) {
+ sleepWhileNotCanceled(Math.abs(_stateModelDelay));
+ }
+ }
+
+ @Transition(to = "MASTER", from = "SLAVE")
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException {
+ if (_stateModelDelay < 0) {
+ sleepWhileNotCanceled(Math.abs(_stateModelDelay));
+ }
+ }
+
+ @Transition(to = "SLAVE", from = "MASTER")
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException {
+ if (_stateModelDelay > 0) {
+ sleepWhileNotCanceled(_stateModelDelay);
+ }
+ }
+
+ @Transition(to = "OFFLINE", from = "SLAVE")
+ public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException {
+ if (_stateModelDelay > 0) {
+ sleepWhileNotCanceled(_stateModelDelay);
+ }
+ }
+
+ @Transition(to = "DROPPED", from = "OFFLINE")
+ public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException {
+ if (_stateModelDelay > 0) {
+ sleepWhileNotCanceled(_stateModelDelay);
+ }
+ }
+ }
+}