You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:11 UTC
[helix] 33/50: Add delayed rebalance and user-defined preference
list features to the WAGED rebalancer. (#456)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 2699b61e3cba8ad64243917684e31ea710b1c3bf
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Oct 1 12:08:56 2019 -0700
Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456)
- Add delayed rebalance and user-defined preference list features to the WAGED rebalancer.
- Refine the delayed rebalance usage in the waged rebalancer.
- Add the delayed rebalance scheduling logic.
- Add the necessary tests. And fix TestMixedModeAutoRebalance and all delayed rebalance tests.
---
.../rebalancer/DelayedAutoRebalancer.java | 203 ++--------------
.../rebalancer/util/DelayedRebalanceUtil.java | 267 +++++++++++++++++++++
.../rebalancer/waged/WagedRebalancer.java | 159 ++++++++++--
.../StrictMatchExternalViewVerifier.java | 6 +-
.../java/org/apache/helix/common/ZkTestBase.java | 4 +-
.../rebalancer/waged/TestWagedRebalancer.java | 35 ++-
.../TestDelayedAutoRebalance.java | 57 +++--
...stDelayedAutoRebalanceWithDisabledInstance.java | 33 +--
.../TestDelayedAutoRebalanceWithRackaware.java | 5 +-
.../rebalancer/TestMixedModeAutoRebalance.java | 101 +++++---
.../rebalancer/TestZeroReplicaAvoidance.java | 74 ++++--
.../WagedRebalancer/TestDelayedWagedRebalance.java | 102 ++++++++
...tDelayedWagedRebalanceWithDisabledInstance.java | 103 ++++++++
.../TestDelayedWagedRebalanceWithRackaware.java | 102 ++++++++
.../TestMixedModeWagedRebalance.java | 66 +++++
.../WagedRebalancer/TestWagedRebalance.java | 23 +-
.../TestWagedRebalanceFaultZone.java | 10 +-
.../apache/helix/tools/TestClusterVerifier.java | 45 +++-
18 files changed, 1059 insertions(+), 336 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 65b3f84..1073d6d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -32,11 +32,10 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
@@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory;
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
- private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@Override
public IdealState computeNewIdealState(String resourceName,
@@ -79,7 +77,8 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
ClusterConfig clusterConfig = clusterData.getClusterConfig();
ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
- boolean delayRebalanceEnabled = isDelayRebalanceEnabled(currentIdealState, clusterConfig);
+ boolean delayRebalanceEnabled =
+ DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);
if (resourceConfig != null) {
userDefinedPreferenceList = resourceConfig.getPreferenceLists();
@@ -110,16 +109,18 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
Set<String> activeNodes = liveEnabledNodes;
if (delayRebalanceEnabled) {
- long delay = getRebalanceDelay(currentIdealState, clusterConfig);
- activeNodes = getActiveInstances(allNodes, currentIdealState, liveEnabledNodes,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), delay, clusterConfig);
+ long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
+ activeNodes = DelayedRebalanceUtil
+ .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
+ clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+ clusterData.getInstanceConfigMap(), delay, clusterConfig);
Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
- setRebalanceScheduler(currentIdealState, offlineOrDisabledInstances,
- clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
- clusterData.getInstanceConfigMap(), delay, clusterConfig);
+ DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
+ offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
+ clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
+ clusterConfig, _manager);
}
if (allNodes.isEmpty() || activeNodes.isEmpty()) {
@@ -162,16 +163,16 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;
- if (isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
+ if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
- int minActiveReplicas = getMinActiveReplica(currentIdealState, replicaCount);
+ int minActiveReplicas =
+ DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, replicaCount);
ZNRecord newActiveMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
- finalMapping =
- getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes,
- replicaCount, minActiveReplicas);
+ finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
+ liveEnabledNodes, replicaCount, minActiveReplicas);
}
finalMapping.getListFields().putAll(userDefinedPreferenceList);
@@ -202,162 +203,15 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
return newIdealState;
}
- /* get all active instances (live instances plus offline-yet-active instances */
- private Set<String> getActiveInstances(Set<String> allNodes, IdealState idealState,
- Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
- Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
- Set<String> activeInstances = new HashSet<>(liveEnabledNodes);
-
- if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
- return activeInstances;
- }
-
- Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
- offlineOrDisabledInstances.removeAll(liveEnabledNodes);
-
- long currentTime = System.currentTimeMillis();
- for (String ins : offlineOrDisabledInstances) {
- long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
- instanceConfigMap.get(ins), clusterConfig);
- InstanceConfig instanceConfig = instanceConfigMap.get(ins);
- if (inactiveTime > currentTime && instanceConfig != null && instanceConfig
- .isDelayRebalanceEnabled()) {
- activeInstances.add(ins);
- }
- }
-
- return activeInstances;
- }
-
- /* Set a rebalance scheduler for the closest future rebalance time. */
- private void setRebalanceScheduler(IdealState idealState, Set<String> offlineOrDisabledInstances,
- Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
- Map<String, InstanceConfig> instanceConfigMap, long delay,
- ClusterConfig clusterConfig) {
- String resourceName = idealState.getResourceName();
- if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
- _rebalanceScheduler.removeScheduledRebalance(resourceName);
- return;
- }
-
- long currentTime = System.currentTimeMillis();
- long nextRebalanceTime = Long.MAX_VALUE;
- // calculate the closest future rebalance time
- for (String ins : offlineOrDisabledInstances) {
- long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
- instanceConfigMap.get(ins), clusterConfig);
- if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) {
- nextRebalanceTime = inactiveTime;
- }
- }
-
- if (nextRebalanceTime == Long.MAX_VALUE) {
- long startTime = _rebalanceScheduler.removeScheduledRebalance(resourceName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String
- .format("Remove exist rebalance timer for resource %s at %d\n", resourceName, startTime));
- }
- } else {
- long currentScheduledTime = _rebalanceScheduler.getRebalanceTime(resourceName);
- if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) {
- _rebalanceScheduler.scheduleRebalance(_manager, resourceName, nextRebalanceTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug(String
- .format("Set next rebalance time for resource %s at time %d\n", resourceName,
- nextRebalanceTime));
- }
- }
- }
- }
-
- /**
- * The time when an offline or disabled instance should be treated as inactive. return -1 if it is
- * inactive now.
- *
- * @return
- */
- private long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
- long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
- long inactiveTime = Long.MAX_VALUE;
-
- // check the time instance went offline.
- if (!liveInstances.contains(instance)) {
- if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
- inactiveTime = offlineTime + delay;
- }
- }
-
- // check the time instance got disabled.
- if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
- && clusterConfig.getDisabledInstances().containsKey(instance))) {
- long disabledTime = instanceConfig.getInstanceEnabledTime();
- if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
- .containsKey(instance)) {
- // Update batch disable time
- long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
- if (disabledTime == -1 || disabledTime > batchDisableTime) {
- disabledTime = batchDisableTime;
- }
- }
- if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
- inactiveTime = disabledTime + delay;
- }
- }
-
- if (inactiveTime == Long.MAX_VALUE) {
- return -1;
- }
-
- return inactiveTime;
- }
-
- private long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
- long delayTime = idealState.getRebalanceDelay();
- if (delayTime < 0) {
- delayTime = clusterConfig.getRebalanceDelayTime();
- }
- return delayTime;
- }
-
- private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig clusterConfig) {
- long delay = getRebalanceDelay(idealState, clusterConfig);
- return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
- . isDelayRebalaceEnabled());
- }
-
private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
if (minActiveReplica >= numReplica) {
return newIdealMapping;
}
ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
- for (String partition : newIdealMapping.getListFields().keySet()) {
- List<String> idealList = newIdealMapping.getListField(partition);
- List<String> activeList = newActiveMapping.getListField(partition);
-
- List<String> liveList = new ArrayList<>();
- int activeReplica = 0;
- for (String ins : activeList) {
- if (liveInstances.contains(ins)) {
- activeReplica++;
- liveList.add(ins);
- }
- }
-
- if (activeReplica >= minActiveReplica) {
- finalMapping.setListField(partition, activeList);
- } else {
- List<String> candidates = new ArrayList<String>(idealList);
- candidates.removeAll(activeList);
- for (String liveIns : candidates) {
- liveList.add(liveIns);
- if (liveList.size() >= minActiveReplica) {
- break;
- }
- }
- finalMapping.setListField(partition, liveList);
- }
- }
+ finalMapping.setListFields(DelayedRebalanceUtil
+ .getFinalDelayedMapping(newIdealMapping.getListFields(), newActiveMapping.getListFields(),
+ liveInstances, minActiveReplica));
return finalMapping;
}
@@ -391,10 +245,11 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
Set<String> liveNodes = cache.getLiveInstances().keySet();
ClusterConfig clusterConfig = cache.getClusterConfig();
- long delayTime = getRebalanceDelay(idealState, clusterConfig);
- Set<String> activeNodes = getActiveInstances(allNodes, idealState, liveNodes,
- cache.getInstanceOfflineTimeMap(), cache.getLiveInstances().keySet(),
- cache.getInstanceConfigMap(), delayTime, clusterConfig);
+ long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
+ Set<String> activeNodes = DelayedRebalanceUtil
+ .getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
+ cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
+ clusterConfig);
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
@@ -419,14 +274,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
return partitionMapping;
}
- private int getMinActiveReplica(IdealState idealState, int replicaCount) {
- int minActiveReplicas = idealState.getMinActiveReplicas();
- if (minActiveReplicas < 0) {
- minActiveReplicas = replicaCount;
- }
- return minActiveReplicas;
- }
-
/**
* compute best state for resource in AUTO ideal state mode
* @param liveInstances
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
new file mode 100644
index 0000000..1342860
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -0,0 +1,267 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The util for supporting delayed rebalance logic.
+ */
+public class DelayedRebalanceUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedRebalanceUtil.class);
+
+ private static RebalanceScheduler REBALANCE_SCHEDULER = new RebalanceScheduler();
+
+ /**
+ * @return true if delay rebalance is configured and enabled in the ClusterConfig configurations.
+ */
+ public static boolean isDelayRebalanceEnabled(ClusterConfig clusterConfig) {
+ long delay = clusterConfig.getRebalanceDelayTime();
+ return (delay > 0 && clusterConfig.isDelayRebalaceEnabled());
+ }
+
+ /**
+ * @return true if delay rebalance is configured and enabled in Resource IdealState and the
+ * ClusterConfig configurations.
+ */
+ public static boolean isDelayRebalanceEnabled(IdealState idealState,
+ ClusterConfig clusterConfig) {
+ long delay = getRebalanceDelay(idealState, clusterConfig);
+ return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
+ .isDelayRebalaceEnabled());
+ }
+
+ /**
+ * @return the rebalance delay based on Resource IdealState and the ClusterConfig configurations.
+ */
+ public static long getRebalanceDelay(IdealState idealState, ClusterConfig clusterConfig) {
+ long delayTime = idealState.getRebalanceDelay();
+ if (delayTime < 0) {
+ delayTime = clusterConfig.getRebalanceDelayTime();
+ }
+ return delayTime;
+ }
+
+ /**
+ * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
+ * delay rebalance configurations.
+ */
+ public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
+ Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+ Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+ if (!isDelayRebalanceEnabled(clusterConfig)) {
+ return new HashSet<>(liveEnabledNodes);
+ }
+ return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
+ instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig);
+ }
+
+ /**
+ * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster
+ * and the resource delay rebalance configurations.
+ */
+ public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState,
+ Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+ Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
+ if (!isDelayRebalanceEnabled(idealState, clusterConfig)) {
+ return new HashSet<>(liveEnabledNodes);
+ }
+ return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes,
+ instanceConfigMap, delay, clusterConfig);
+ }
+
+ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
+ Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes,
+ Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) {
+ Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
+ Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
+ offlineOrDisabledInstances.removeAll(liveEnabledNodes);
+ long currentTime = System.currentTimeMillis();
+ for (String ins : offlineOrDisabledInstances) {
+ long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
+ instanceConfigMap.get(ins), clusterConfig);
+ InstanceConfig instanceConfig = instanceConfigMap.get(ins);
+ if (inactiveTime > currentTime && instanceConfig != null && instanceConfig
+ .isDelayRebalanceEnabled()) {
+ activeNodes.add(ins);
+ }
+ }
+ return activeNodes;
+ }
+
+ /**
+ * @return The time when an offline or disabled instance should be treated as inactive.
+ * Return -1 if it is inactive now.
+ */
+ private static long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime,
+ long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
+ long inactiveTime = Long.MAX_VALUE;
+
+ // check the time instance went offline.
+ if (!liveInstances.contains(instance)) {
+ if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) {
+ inactiveTime = offlineTime + delay;
+ }
+ }
+
+ // check the time instance got disabled.
+ if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
+ && clusterConfig.getDisabledInstances().containsKey(instance))) {
+ long disabledTime = instanceConfig.getInstanceEnabledTime();
+ if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances()
+ .containsKey(instance)) {
+ // Update batch disable time
+ long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
+ if (disabledTime == -1 || disabledTime > batchDisableTime) {
+ disabledTime = batchDisableTime;
+ }
+ }
+ if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
+ inactiveTime = disabledTime + delay;
+ }
+ }
+
+ if (inactiveTime == Long.MAX_VALUE) {
+ return -1;
+ }
+
+ return inactiveTime;
+ }
+
+ /**
+ * Merge the new ideal preference list with the delayed mapping that is calculated based on the
+ * delayed rebalance configurations.
+ * The method will prioritize the "active" preference list so as to avoid unnecessary transient
+ * state transitions.
+ *
+ * @param newIdealPreferenceList the ideal mapping that was calculated based on the current
+ * instance status
+ * @param newDelayedPreferenceList the delayed mapping that was calculated based on the delayed
+ * instance status
+ * @param liveEnabledInstances list of all the nodes that are both alive and enabled.
+ * @param minActiveReplica the minimum replica count to ensure a valid mapping.
+ * If the active list does not have enough replica assignment,
+ * this method will fill the list with the new ideal mapping until
+ * the replica count satisfies the minimum requirement.
+ * @return the merged state mapping.
+ */
+ public static Map<String, List<String>> getFinalDelayedMapping(
+ Map<String, List<String>> newIdealPreferenceList,
+ Map<String, List<String>> newDelayedPreferenceList, Set<String> liveEnabledInstances,
+ int minActiveReplica) {
+ Map<String, List<String>> finalPreferenceList = new HashMap<>();
+ for (String partition : newIdealPreferenceList.keySet()) {
+ List<String> idealList = newIdealPreferenceList.get(partition);
+ List<String> delayedIdealList = newDelayedPreferenceList.get(partition);
+
+ List<String> liveList = new ArrayList<>();
+ for (String ins : delayedIdealList) {
+ if (liveEnabledInstances.contains(ins)) {
+ liveList.add(ins);
+ }
+ }
+
+ if (liveList.size() >= minActiveReplica) {
+ finalPreferenceList.put(partition, delayedIdealList);
+ } else {
+ List<String> candidates = new ArrayList<>(idealList);
+ candidates.removeAll(delayedIdealList);
+ for (String liveIns : candidates) {
+ liveList.add(liveIns);
+ if (liveList.size() >= minActiveReplica) {
+ break;
+ }
+ }
+ finalPreferenceList.put(partition, liveList);
+ }
+ }
+ return finalPreferenceList;
+ }
+
+ /**
+ * Get the minimum active replica count threshold that allows delayed rebalance.
+ *
+ * @param idealState the resource Ideal State
+ * @param replicaCount the expected active replica count.
+ * @return the expected minimum active replica count that is required
+ */
+ public static int getMinActiveReplica(IdealState idealState, int replicaCount) {
+ int minActiveReplicas = idealState.getMinActiveReplicas();
+ if (minActiveReplicas < 0) {
+ minActiveReplicas = replicaCount;
+ }
+ return minActiveReplicas;
+ }
+
+ /**
+ * Set a rebalance scheduler for the closest future rebalance time.
+ */
+ public static void setRebalanceScheduler(String resourceName, boolean isDelayedRebalanceEnabled,
+ Set<String> offlineOrDisabledInstances, Map<String, Long> instanceOfflineTimeMap,
+ Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, long delay,
+ ClusterConfig clusterConfig, HelixManager manager) {
+ if (!isDelayedRebalanceEnabled) {
+ REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName);
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ long nextRebalanceTime = Long.MAX_VALUE;
+ // calculate the closest future rebalance time
+ for (String ins : offlineOrDisabledInstances) {
+ long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay,
+ instanceConfigMap.get(ins), clusterConfig);
+ if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) {
+ nextRebalanceTime = inactiveTime;
+ }
+ }
+
+ if (nextRebalanceTime == Long.MAX_VALUE) {
+ long startTime = REBALANCE_SCHEDULER.removeScheduledRebalance(resourceName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String
+ .format("Remove exist rebalance timer for resource %s at %d\n", resourceName,
+ startTime));
+ }
+ } else {
+ long currentScheduledTime = REBALANCE_SCHEDULER.getRebalanceTime(resourceName);
+ if (currentScheduledTime < 0 || currentScheduledTime > nextRebalanceTime) {
+ REBALANCE_SCHEDULER.scheduleRebalance(manager, resourceName, nextRebalanceTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String
+ .format("Set next rebalance time for resource %s at time %d\n", resourceName,
+ nextRebalanceTime));
+ }
+ }
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 1861e10..d211884 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -27,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -36,6 +36,7 @@ import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -46,12 +47,10 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
@@ -73,6 +72,7 @@ public class WagedRebalancer {
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
new ThreadLocal<>();
+ private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
@@ -97,11 +97,18 @@ public class WagedRebalancer {
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
- new DelayedAutoRebalancer());
+ new DelayedAutoRebalancer(),
+ // Helix Manager is required for the rebalancer scheduler
+ helixManager);
}
- private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+ protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+ this(assignmentMetadataStore, algorithm, mappingCalculator, null);
+ }
+
+ private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
+ RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
@@ -109,12 +116,7 @@ public class WagedRebalancer {
_assignmentMetadataStore = assignmentMetadataStore;
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
- }
-
- @VisibleForTesting
- protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
- RebalanceAlgorithm algorithm) {
- this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+ _manager = manager;
}
// Release all the resources.
@@ -196,29 +198,59 @@ public class WagedRebalancer {
clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
}
+ Set<String> activeNodes = DelayedRebalanceUtil
+ .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
+ clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+ clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+
+ // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
+ delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
+
Map<String, ResourceAssignment> newAssignment =
- partialRebalance(clusterData, clusterChanges, resourceMap, currentStateOutput);
+ partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes,
+ currentStateOutput);
+ // <ResourceName, <State, Priority>>
+ Map<String, Map<String, Integer>> resourceStatePriorityMap = new HashMap<>();
// Convert the assignments into IdealState for the following state mapping calculation.
- Map<String, IdealState> finalIdealState = new HashMap<>();
+ Map<String, IdealState> finalIdealStateMap = new HashMap<>();
for (String resourceName : newAssignment.keySet()) {
- IdealState newIdeaState;
+ IdealState newIdealState;
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap = clusterData
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+ // Keep the priority map for the rebalance overwrite logic later.
+ resourceStatePriorityMap.put(resourceName, statePriorityMap);
// Create a new IdealState instance contains the new calculated assignment in the preference
// list.
- newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
+ newIdealState = generateIdealStateWithAssignment(resourceName, currentIdealState,
newAssignment.get(resourceName), statePriorityMap);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Fail to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
- finalIdealState.put(resourceName, newIdeaState);
+ finalIdealStateMap.put(resourceName, newIdealState);
+ }
+
+ // The additional rebalance overwrite is required since the calculated mapping may contains
+ // some delayed rebalanced assignments.
+ if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+ applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
+ resourceStatePriorityMap,
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet()));
}
- return finalIdealState;
+ // Replace the assignment if user-defined preference list is configured.
+ // Note the user-defined list is intentionally applied to the final mapping after calculation.
+ // This is to avoid persisting it into the assignment store, which impacts the long term
+ // assignment evenness and partition movements.
+ finalIdealStateMap.entrySet().stream().forEach(
+ idealStateEntry -> applyUserDefinedPreferenceList(
+ clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
+
+ return finalIdealStateMap;
}
// TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
@@ -253,7 +285,8 @@ public class WagedRebalancer {
private Map<String, ResourceAssignment> partialRebalance(
ResourceControllerDataProvider clusterData,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
- final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
@@ -261,8 +294,8 @@ public class WagedRebalancer {
getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
Map<String, ResourceAssignment> newAssignment =
- calculateAssignment(clusterData, clusterChanges, resourceMap,
- clusterData.getEnabledLiveInstances(), currentBaseline, currentBestPossibleAssignment);
+ calculateAssignment(clusterData, clusterChanges, resourceMap, activeNodes, currentBaseline,
+ currentBestPossibleAssignment);
if (_assignmentMetadataStore != null) {
try {
@@ -458,4 +491,88 @@ public class WagedRebalancer {
}
return currentStateAssignment;
}
+
+ /**
+ * Schedule rebalance according to the delayed rebalance logic.
+ * @param clusterData the current cluster data cache
+ * @param delayedActiveNodes the active nodes set that is calculated with the delay time window
+ * @param resourceSet the rebalanced resourceSet
+ */
+ private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData,
+ Set<String> delayedActiveNodes, Set<String> resourceSet) {
+ if (_manager != null) {
+ // Schedule for the next delayed rebalance in case no cluster change event happens.
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
+ Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
+ offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
+ for (String resource : resourceSet) {
+ DelayedRebalanceUtil
+ .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
+ clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+ clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
+ clusterConfig, _manager);
+ }
+ } else {
+ LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
+ }
+ }
+
+ /**
+ * Update the rebalanced ideal states according to the real active nodes.
+ * Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
+ * might include inactive nodes.
+ * This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
+ * @param idealStateMap the calculated ideal states.
+ * @param clusterData the cluster data cache.
+ * @param resourceMap the rebalanaced resource map.
+ * @param clusterChanges the detected cluster changes that triggeres the rebalance.
+ * @param resourceStatePriorityMap the state priority map for each resource.
+ * @param baseline the baseline assignment
+ */
+ private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+ Map<String, Map<String, Integer>> resourceStatePriorityMap,
+ Map<String, ResourceAssignment> baseline)
+ throws HelixRebalanceException {
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ // Note that the calculation used the baseline as the input only. This is for minimizing unnecessary partition movement.
+ Map<String, ResourceAssignment> activeAssignment =
+ calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
+ Collections.emptyMap(), baseline);
+ for (String resourceName : idealStateMap.keySet()) {
+ IdealState is = idealStateMap.get(resourceName);
+ if (!activeAssignment.containsKey(resourceName)) {
+ throw new HelixRebalanceException(
+ "Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
+ + resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ IdealState newActiveIdealState =
+ generateIdealStateWithAssignment(resourceName, currentIdealState,
+ activeAssignment.get(resourceName), resourceStatePriorityMap.get(resourceName));
+
+ int numReplia = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+ int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplia);
+ Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
+ .getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(), is.getPreferenceLists(),
+ enabledLiveInstances, Math.min(minActiveReplica, numReplia));
+
+ is.setPreferenceLists(finalPreferenceLists);
+ }
+ }
+
+ private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
+ IdealState idealState) {
+ if (resourceConfig != null) {
+ Map<String, List<String>> userDefinedPreferenceList = resourceConfig.getPreferenceLists();
+ if (!userDefinedPreferenceList.isEmpty()) {
+ LOG.info("Using user defined preference list for partitions.");
+ for (String partition : userDefinedPreferenceList.keySet()) {
+ idealState.setPreferenceList(partition, userDefinedPreferenceList.get(partition));
+ }
+ }
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index 85f0397..f3bca9e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
@@ -250,11 +251,12 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
+ "is enabled."));
}
for (String partition : idealState.getPartitionSet()) {
- if (idealState.getPreferenceList(partition) == null || idealState.getPreferenceList(partition).isEmpty()) {
+ if (idealState.getInstanceStateMap(partition) == null || idealState
+ .getInstanceStateMap(partition).isEmpty()) {
return false;
}
}
- idealPartitionState = computeIdealPartitionState(dataCache, idealState);
+ idealPartitionState = idealState.getRecord().getMapFields();
break;
case SEMI_AUTO:
case USER_DEFINED:
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index b9284b9..e166e13 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -353,9 +353,9 @@ public class ZkTestBase {
}
protected IdealState createResourceWithWagedRebalance(String clusterName, String db,
- String stateModel, int numPartition, int replica, int minActiveReplica, long delay) {
+ String stateModel, int numPartition, int replica, int minActiveReplica) {
return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica,
- delay, WagedRebalancer.class.getName(), null);
+ -1, WagedRebalancer.class.getName(), null);
}
private IdealState createResource(String clusterName, String db, String stateModel,
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index e7368be..96b6523 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,10 +26,10 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -112,7 +112,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test
public void testRebalance() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -132,9 +133,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance")
- public void testPartialRebalance() throws IOException, HelixRebalanceException {
+ public void testPartialRebalance()
+ throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -159,7 +162,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -216,9 +220,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
- public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+ public void testNonCompatibleConfiguration()
+ throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String nonCompatibleResourceName = _resourceNames.get(0);
@@ -237,9 +243,11 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// TODO test with invalid capacity configuration which will fail the cluster model constructing.
@Test(dependsOnMethods = "testRebalance")
- public void testInvalidClusterStatus() throws IOException {
+ public void testInvalidClusterStatus()
+ throws IOException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String invalidResource = _resourceNames.get(0);
@@ -264,7 +272,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
when(metadataStore.getBaseline())
.thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
- WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(metadataStore, _algorithm, new DelayedAutoRebalancer());
ResourceControllerDataProvider clusterData = setupClusterDataCache();
// The input resource Map shall contain all the valid resources.
@@ -288,7 +297,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
HelixRebalanceException.Type.FAILED_TO_CALCULATE));
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer());
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
@@ -312,7 +322,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// won't propagate any existing assignment from the cluster model.
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
+ WagedRebalancer rebalancer =
+ new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
// 1. rebalance with baseline calculation done
// Generate the input for the rebalancer.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 0105a51..7d4965e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -44,19 +43,22 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestDelayedAutoRebalance extends ZkTestBase {
- final int NUM_NODE = 5;
+ static final int NUM_NODE = 5;
protected static final int START_PORT = 12918;
- protected static final int _PARTITIONS = 5;
+ protected static final int PARTITIONS = 5;
+ // TODO: remove this wait time once we have a better way to determine if the rebalance has been
+ // TODO: done as a reaction of the test operations.
+ protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1000;
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
protected ClusterControllerManager _controller;
- List<MockParticipantManager> _participants = new ArrayList<>();
- int _replica = 3;
- int _minActiveReplica = _replica - 1;
- ZkHelixClusterVerifier _clusterVerifier;
- List<String> _testDBs = new ArrayList<String>();
+ protected List<MockParticipantManager> _participants = new ArrayList<>();
+ protected int _replica = 3;
+ protected int _minActiveReplica = _replica - 1;
+ protected ZkHelixClusterVerifier _clusterVerifier;
+ protected List<String> _testDBs = new ArrayList<>();
@BeforeClass
public void beforeClass() throws Exception {
@@ -80,8 +82,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ _clusterVerifier = getClusterVerifier();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
}
@@ -123,7 +124,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
// bring down another node, the minimal active replica for each partition should be maintained.
_participants.get(3).syncStop();
- Thread.sleep(500);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
ExternalView ev =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
@@ -141,10 +143,11 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
long delay = 4000;
- Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
validateDelayedMovements(externalViewsBefore);
- Thread.sleep(delay + 200);
+ Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// after delay time, it should maintain required number of replicas.
for (String db : _testDBs) {
@@ -157,7 +160,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
@Test (dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
public void testDisableDelayRebalanceInResource() throws Exception {
- Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
validateDelayedMovements(externalViewsBefore);
// disable delay rebalance for one db, partition should be moved immediately
@@ -166,7 +170,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
CLUSTER_NAME, testDb);
idealState.setDelayRebalanceEnabled(false);
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
-
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// once delay rebalance is disabled, it should maintain required number of replicas for that db.
@@ -190,13 +194,13 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
@Test (dependsOnMethods = {"testDisableDelayRebalanceInResource"})
public void testDisableDelayRebalanceInCluster() throws Exception {
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
-
- Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
validateDelayedMovements(externalViewsBefore);
// disable delay rebalance for the entire cluster.
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
ExternalView ev =
@@ -210,13 +214,14 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
@Test (dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
public void testDisableDelayRebalanceInInstance() throws Exception {
- Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
validateDelayedMovements(externalViewsBefore);
String disabledInstanceName = _participants.get(0).getInstanceName();
enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, false);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
-
for (String db : _testDBs) {
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
Map<String, List<String>> preferenceLists = is.getPreferenceLists();
@@ -234,7 +239,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
}
_testDBs.clear();
- Thread.sleep(50);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
}
@BeforeMethod
@@ -249,17 +254,21 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
}
}
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ }
+
// create test DBs, wait it converged and return externalviews
protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
int i = 0;
for (String stateModel : TestStateModels) {
String db = "Test-DB-" + i++;
- createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+ createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
_minActiveReplica, delayTime, CrushRebalanceStrategy.class.getName());
_testDBs.add(db);
}
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
ExternalView ev =
@@ -302,7 +311,7 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
private void validateDelayedMovements(Map<String, ExternalView> externalViewsBefore)
throws InterruptedException {
_participants.get(0).syncStop();
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
index 746bdf3..145148f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
import java.util.Map;
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -56,7 +55,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
String instance = _participants.get(0).getInstanceName();
enableInstance(instance, false);
- Thread.sleep(300);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -79,7 +78,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
String instance = _participants.get(0).getInstanceName();
enableInstance(instance, false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -106,7 +105,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// disable one node, no partition should be moved.
enableInstance(_participants.get(0).getInstanceName(), false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -120,7 +119,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// disable another node, the minimal active replica for each partition should be maintained.
enableInstance(_participants.get(3).getInstanceName(), false);
- Thread.sleep(1000);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -143,7 +142,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// disable one node, no partition should be moved.
enableInstance(_participants.get(0).getInstanceName(), false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -157,7 +156,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// bring down another node, the minimal active replica for each partition should be maintained.
_participants.get(3).syncStop();
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -178,11 +177,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
long delay = 10000;
- Map<String, ExternalView> externalViewsBefore = createTestDBs(delay);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
// disable one node, no partition should be moved.
enableInstance(_participants.get(0).getInstanceName(), false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
ExternalView ev =
@@ -193,7 +193,8 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
_participants.get(0).getInstanceName(), true);
}
- Thread.sleep(delay + 500);
+ Thread.sleep(delay + DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
// after delay time, it should maintain required number of replicas.
for (String db : _testDBs) {
ExternalView ev =
@@ -210,7 +211,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// disable one node, no partition should be moved.
enableInstance(_participants.get(0).getInstanceName(), false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -228,7 +229,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
CLUSTER_NAME, testDb);
idealState.setDelayRebalanceEnabled(false);
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, idealState);
- Thread.sleep(2000);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// once delay rebalance is disabled, it should maintain required number of replicas for that db.
@@ -253,12 +254,12 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
@Override
public void testDisableDelayRebalanceInCluster() throws Exception {
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true);
-
- Map<String, ExternalView> externalViewsBefore = createTestDBs(1000000);
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000);
+ Map<String, ExternalView> externalViewsBefore = createTestDBs(-1);
// disable one node, no partition should be moved.
enableInstance(_participants.get(0).getInstanceName(), false);
- Thread.sleep(100);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
@@ -272,7 +273,7 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAut
// disable delay rebalance for the entire cluster.
enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false);
- Thread.sleep(2000);
+ Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _testDBs) {
ExternalView ev =
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
index f768684..f85f07f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
@@ -29,7 +29,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebalance {
- final int NUM_NODE = 9;
+ static final int NUM_NODE = 9;
@BeforeClass
public void beforeClass() throws Exception {
@@ -58,8 +58,7 @@ public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebala
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ _clusterVerifier = getClusterVerifier();
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index 76560e9..33dab8d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -25,16 +25,15 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockMSStateModel;
import org.apache.helix.mock.participant.MockTransition;
@@ -49,6 +48,7 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -60,13 +60,13 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
private final String CLASS_NAME = getShortClassName();
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
- private ClusterControllerManager _controller;
+ protected static final String DB_NAME = "Test-DB";
+ private ClusterControllerManager _controller;
private List<MockParticipantManager> _participants = new ArrayList<>();
private int _replica = 3;
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
- private HelixDataAccessor _dataAccessor;
@BeforeClass
public void beforeClass() throws Exception {
@@ -90,13 +90,11 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ _clusterVerifier = getClusterVerifier();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
_configAccessor = new ConfigAccessor(_gZkClient);
- _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
}
@DataProvider(name = "stateModels")
@@ -112,19 +110,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
};
}
- @Test(dataProvider = "stateModels")
- public void testUserDefinedPreferenceListsInFullAuto(
- String stateModel, boolean delayEnabled, String rebalanceStrateyName) throws Exception {
- String db = "Test-DB-" + stateModel;
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ }
+
+ protected void createResource(String stateModel, int numPartition, int replica,
+ boolean delayEnabled, String rebalanceStrategy) {
if (delayEnabled) {
- createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
- _replica - 1, 200, rebalanceStrateyName);
+ createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+ replica - 1, 200, rebalanceStrategy);
} else {
- createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
- _replica, 0, rebalanceStrateyName);
+ createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+ replica, 0, rebalanceStrategy);
}
+ }
+
+ @Test(dataProvider = "stateModels")
+ public void testUserDefinedPreferenceListsInFullAuto(String stateModel, boolean delayEnabled,
+ String rebalanceStrateyName) throws Exception {
+ createResource(stateModel, _PARTITIONS, _replica, delayEnabled,
+ rebalanceStrateyName);
IdealState idealState =
- _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
List<String> userDefinedPartitions = new ArrayList<>();
for (String partition : userDefinedPreferenceLists.keySet()) {
@@ -138,33 +145,34 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
}
ResourceConfig resourceConfig =
- new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
- _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+ new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
+ _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
- Assert.assertTrue(_clusterVerifier.verify(1000));
- verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
+ Assert.assertTrue(_clusterVerifier.verify(3000));
+ verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
while (userDefinedPartitions.size() > 0) {
- IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
+ DB_NAME);
Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet());
nonUserDefinedPartitions.removeAll(userDefinedPartitions);
- removePartitionFromUserDefinedList(db, userDefinedPartitions);
- Assert.assertTrue(_clusterVerifier.verify(1000));
- verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
- verifyNonUserDefinedAssignment(db, originIS, nonUserDefinedPartitions);
+ removePartitionFromUserDefinedList(DB_NAME, userDefinedPartitions);
+ // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier for the WAGED rebalancer.
+ Thread.sleep(1000);
+ Assert.assertTrue(_clusterVerifier.verify(3000));
+ verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
+ verifyNonUserDefinedAssignment(DB_NAME, originIS, nonUserDefinedPartitions);
}
}
@Test
public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws Exception {
- String db = "Test-DB-1";
- createResourceWithDelayedRebalance(CLUSTER_NAME, db,
- BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica, _replica, 0,
- CrushRebalanceStrategy.class.getName());
+ createResource(BuiltInStateModelDefinitions.MasterSlave.name(), 5, _replica,
+ false, CrushRebalanceStrategy.class.getName());
IdealState idealState =
- _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DB_NAME);
Map<String, List<String>> userDefinedPreferenceLists = idealState.getPreferenceLists();
List<String> newNodes = new ArrayList<>();
@@ -187,13 +195,28 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
}
ResourceConfig resourceConfig =
- new ResourceConfig.Builder(db).setPreferenceLists(userDefinedPreferenceLists).build();
- _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+ new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
+ _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
+
+ TestHelper.verify(() -> {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
+ if (ev != null) {
+ for (String partition : ev.getPartitionSet()) {
+ Map<String, String> stateMap = ev.getStateMap(partition);
+ if (stateMap.values().contains("ERROR")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }, 2000);
+ Assert.assertTrue(_clusterVerifier.verify(3000));
- Thread.sleep(1000);
ExternalView ev =
- _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
- IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DB_NAME);
+ IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
+ DB_NAME);
validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
}
@@ -238,6 +261,12 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
_configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
}
+ @AfterMethod
+ public void afterMethod() {
+ _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
+ getClusterVerifier().verify(5000);
+ }
+
@AfterClass
public void afterClass() throws Exception {
/**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index ab4a263..7090cbf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -39,10 +40,11 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestZeroReplicaAvoidance extends ZkTestBase
@@ -53,16 +55,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
private List<MockParticipantManager> _participants = new ArrayList<>();
- private ZkHelixClusterVerifier _clusterVerifier;
private boolean _testSuccess = true;
private boolean _startListen = false;
private ClusterControllerManager _controller;
- @BeforeClass
- public void beforeClass() throws Exception {
- System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
-
+ @BeforeMethod
+ public void beforeMethod() {
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
@@ -77,13 +76,11 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
-
- _clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
}
- @AfterClass
- public void afterClass() {
+ @AfterMethod
+ public void afterMethod() {
+ _startListen = false;
if (_controller != null && _controller.isConnected()) {
_controller.syncStop();
}
@@ -92,6 +89,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
participant.syncStop();
}
}
+ _participants.clear();
deleteCluster(CLUSTER_NAME);
}
@@ -102,7 +100,8 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
};
@Test
- public void test() throws Exception {
+ public void testDelayedRebalancer() throws Exception {
+ System.out.println("START testDelayedRebalancer at " + new Date(System.currentTimeMillis()));
HelixManager manager =
HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
manager.connect();
@@ -123,7 +122,51 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica,
0);
}
- Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
+ ZkHelixClusterVerifier clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+
+ _startListen = true;
+ DelayedTransition.setDelay(5);
+
+ // add the other half of nodes.
+ for (; i < NUM_NODE; i++) {
+ _participants.get(i).syncStart();
+ }
+ Assert.assertTrue(clusterVerifier.verify(70000L));
+ Assert.assertTrue(_testSuccess);
+
+ if (manager.isConnected()) {
+ manager.disconnect();
+ }
+ System.out.println("END testDelayedRebalancer at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testWagedRebalancer() throws Exception {
+ System.out.println("START testWagedRebalancer at " + new Date(System.currentTimeMillis()));
+ HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, null, InstanceType.SPECTATOR, ZK_ADDR);
+ manager.connect();
+ manager.addExternalViewChangeListener(this);
+ manager.addIdealStateChangeListener(this);
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ // Start half number of nodes.
+ int i = 0;
+ for (; i < NUM_NODE / 2; i++) {
+ _participants.get(i).syncStart();
+ }
+
+ int replica = 3;
+ int partition = 30;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + stateModel;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica);
+ }
+ ZkHelixClusterVerifier clusterVerifier =
+ new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
_startListen = true;
DelayedTransition.setDelay(5);
@@ -132,12 +175,13 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
for (; i < NUM_NODE; i++) {
_participants.get(i).syncStart();
}
- Assert.assertTrue(_clusterVerifier.verify(70000L));
+ Assert.assertTrue(clusterVerifier.verify(70000L));
Assert.assertTrue(_testSuccess);
if (manager.isConnected()) {
manager.disconnect();
}
+ System.out.println("END testWagedRebalancer at " + new Date(System.currentTimeMillis()));
}
/**
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
new file mode 100644
index 0000000..8587f40
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalance to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance {
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ Set<String> dbNames = new HashSet<>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ dbNames.add("Test-DB-" + i++);
+ }
+ return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+ .setZkAddr(ZK_ADDR).build();
+ }
+
+ // create test DBs, wait it converged and return externalviews
+ protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+ Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _minActiveReplica);
+ _testDBs.add(db);
+ }
+ Thread.sleep(100);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViews.put(db, ev);
+ }
+ return externalViews;
+ }
+
+ @Test
+ public void testDelayedPartitionMovement() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test
+ public void testDisableDelayRebalanceInResource() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+ public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+ super.testDelayedPartitionMovementWithClusterConfigedDelay();
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+ public void testMinimalActiveReplicaMaintain() throws Exception {
+ super.testMinimalActiveReplicaMaintain();
+ }
+
+ @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+ public void testPartitionMovementAfterDelayTime() throws Exception {
+ super.testPartitionMovementAfterDelayTime();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+ public void testDisableDelayRebalanceInCluster() throws Exception {
+ super.testDisableDelayRebalanceInCluster();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+ public void testDisableDelayRebalanceInInstance() throws Exception {
+ super.testDisableDelayRebalanceInInstance();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..fab254c
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -0,0 +1,103 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalanceWithDisabledInstance to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalanceWithDisabledInstance
+ extends TestDelayedAutoRebalanceWithDisabledInstance {
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ Set<String> dbNames = new HashSet<>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ dbNames.add("Test-DB-" + i++);
+ }
+ return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+ .setZkAddr(ZK_ADDR).build();
+ }
+
+ // create test DBs, wait it converged and return externalviews
+ protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+ Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _minActiveReplica);
+ _testDBs.add(db);
+ }
+ Thread.sleep(100);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViews.put(db, ev);
+ }
+ return externalViews;
+ }
+
+ @Test
+ public void testDelayedPartitionMovement() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test
+ public void testDisableDelayRebalanceInResource() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+ public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+ super.testDelayedPartitionMovementWithClusterConfigedDelay();
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+ public void testMinimalActiveReplicaMaintain() throws Exception {
+ super.testMinimalActiveReplicaMaintain();
+ }
+
+ @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+ public void testPartitionMovementAfterDelayTime() throws Exception {
+ super.testPartitionMovementAfterDelayTime();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+ public void testDisableDelayRebalanceInCluster() throws Exception {
+ super.testDisableDelayRebalanceInCluster();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+ public void testDisableDelayRebalanceInInstance() throws Exception {
+ super.testDisableDelayRebalanceInInstance();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
new file mode 100644
index 0000000..4791e6e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Inherit TestDelayedAutoRebalanceWithRackaware to ensure the test logic is the same.
+ */
+public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebalanceWithRackaware {
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ Set<String> dbNames = new HashSet<>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ dbNames.add("Test-DB-" + i++);
+ }
+ return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+ .setZkAddr(ZK_ADDR).build();
+ }
+
+ // create test DBs, wait it converged and return externalviews
+ protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+ Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
+ int i = 0;
+ for (String stateModel : TestStateModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _minActiveReplica);
+ _testDBs.add(db);
+ }
+ Thread.sleep(100);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ for (String db : _testDBs) {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ externalViews.put(db, ev);
+ }
+ return externalViews;
+ }
+
+ @Test
+ public void testDelayedPartitionMovement() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test
+ public void testDisableDelayRebalanceInResource() {
+ // Waged Rebalancer takes cluster level delay config only. Skip this test.
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
+ public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+ super.testDelayedPartitionMovementWithClusterConfigedDelay();
+ }
+
+ @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
+ public void testMinimalActiveReplicaMaintain() throws Exception {
+ super.testMinimalActiveReplicaMaintain();
+ }
+
+ @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
+ public void testPartitionMovementAfterDelayTime() throws Exception {
+ super.testPartitionMovementAfterDelayTime();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
+ public void testDisableDelayRebalanceInCluster() throws Exception {
+ super.testDisableDelayRebalanceInCluster();
+ }
+
+ @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
+ public void testDisableDelayRebalanceInInstance() throws Exception {
+ super.testDisableDelayRebalanceInInstance();
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
new file mode 100644
index 0000000..7087dfc
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -0,0 +1,66 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import org.apache.helix.integration.rebalancer.TestMixedModeAutoRebalance;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+
+public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance {
+ private final String CLASS_NAME = getShortClassName();
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ @DataProvider(name = "stateModels")
+ public static Object[][] stateModels() {
+ return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true, null },
+ { BuiltInStateModelDefinitions.OnlineOffline.name(), true, null },
+ { BuiltInStateModelDefinitions.LeaderStandby.name(), true, null },
+ { BuiltInStateModelDefinitions.MasterSlave.name(), false, null },
+ { BuiltInStateModelDefinitions.OnlineOffline.name(), false, null },
+ { BuiltInStateModelDefinitions.LeaderStandby.name(), false, null }
+ };
+ }
+
+ protected ZkHelixClusterVerifier getClusterVerifier() {
+ return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(Collections.singleton(DB_NAME)).build();
+ }
+
+ protected void createResource(String stateModel, int numPartition,
+ int replica, boolean delayEnabled, String rebalanceStrategy) {
+ if (delayEnabled) {
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 200);
+ createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica,
+ replica - 1);
+ } else {
+ createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, numPartition, replica, replica);
+ }
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ super.afterMethod();
+ setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index fb5375c..37c1229 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -109,8 +109,7 @@ public class TestWagedRebalance extends ZkTestBase {
int i = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + i++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
- -1);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -123,7 +122,7 @@ public class TestWagedRebalance extends ZkTestBase {
for (String stateModel : _testModels) {
String moreDB = "More-Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
_allDBs.add(moreDB);
@@ -151,7 +150,7 @@ public class TestWagedRebalance extends ZkTestBase {
for (String tag : tags) {
String db = "Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, db,
- BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
IdealState is =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
is.setInstanceGroupTag(tag);
@@ -167,7 +166,7 @@ public class TestWagedRebalance extends ZkTestBase {
public void testChangeIdealState() throws InterruptedException {
String dbName = "Test-DB";
createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
- BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
_allDBs.add(dbName);
Thread.sleep(300);
@@ -201,7 +200,7 @@ public class TestWagedRebalance extends ZkTestBase {
public void testDisableInstance() throws InterruptedException {
String dbName = "Test-DB";
createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
- BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
_allDBs.add(dbName);
Thread.sleep(300);
@@ -256,8 +255,8 @@ public class TestWagedRebalance extends ZkTestBase {
int j = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + j++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
- -1);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -295,8 +294,8 @@ public class TestWagedRebalance extends ZkTestBase {
int j = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + j++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica,
- -1);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -334,7 +333,7 @@ public class TestWagedRebalance extends ZkTestBase {
IdealState.RebalanceMode.FULL_AUTO + "", CrushEdRebalanceStrategy.class.getName());
} else {
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
}
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
@@ -357,7 +356,7 @@ public class TestWagedRebalance extends ZkTestBase {
for (String stateModel : _testModels) {
String db = "Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
if (i == 1) {
// The limited resource has additional limitation, so even the other resources can be assigned
// later, this resource will still be blocked by the max partition limitation.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
index 0b020db..84c6ac4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -112,7 +112,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
for (String stateModel : _testModels) {
String db = "Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -128,7 +128,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
for (String tag : tags) {
String db = "Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, db,
- BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica, -1);
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, _replica);
IdealState is =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
is.setInstanceGroupTag(tag);
@@ -156,7 +156,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
for (String stateModel : _testModels) {
String db = "Test-DB-" + j++;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -198,7 +198,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
for (String stateModel : _testModels) {
String db = "Test-DB-" + j++;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -230,7 +230,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
for (String stateModel : _testModels) {
String db = "Test-DB-" + i++;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica,
- _replica, -1);
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
index ca6b6b6..e1ecdee 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
@@ -125,19 +125,42 @@ public class TestClusterVerifier extends ZkUnitTestBase {
new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
Assert.assertTrue(bestPossibleVerifier.verify(10000));
+ // Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState.
+ _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(),
+ FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+ Thread.sleep(1000);
+ Assert.assertTrue(bestPossibleVerifier.verify(3000));
+
+ // Enable the partition back
+ _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(),
+ FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+ Thread.sleep(1000);
+ Assert.assertTrue(bestPossibleVerifier.verify(10000));
+
+ // Make 1 instance non-live
+ _participants[0].syncStop();
+ Thread.sleep(1000);
+ Assert.assertTrue(bestPossibleVerifier.verify(10000));
+
+ // Recover the participant before next test
+ String id = _participants[0].getInstanceName();
+ _participants[0] = new MockParticipantManager(ZK_ADDR, _clusterName, id);
+ _participants[0].syncStart();
+
HelixClusterVerifier strictMatchVerifier =
- new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+ new StrictMatchExternalViewVerifier.Builder(_clusterName)
+ .setResources(Sets.newHashSet(RESOURCES)).setZkClient(_gZkClient).build();
Assert.assertTrue(strictMatchVerifier.verify(10000));
// Disable partition for 1 instance, then Full-Auto ExternalView should not match IdealState.
- _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0],
- Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+ _admin.enablePartition(false, _clusterName, _participants[0].getInstanceName(),
+ FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
Thread.sleep(1000);
- Assert.assertFalse(strictMatchVerifier.verify(3000));
+ Assert.assertTrue(strictMatchVerifier.verify(3000));
// Enable the partition back
- _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(), FULL_AUTO_RESOURCES[0],
- Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
+ _admin.enablePartition(true, _clusterName, _participants[0].getInstanceName(),
+ FULL_AUTO_RESOURCES[0], Lists.newArrayList(FULL_AUTO_RESOURCES[0] + "_0"));
Thread.sleep(1000);
Assert.assertTrue(strictMatchVerifier.verify(10000));
@@ -148,14 +171,16 @@ public class TestClusterVerifier extends ZkUnitTestBase {
// Semi-Auto ExternalView should not match IdealState
for (String resource : SEMI_AUTO_RESOURCES) {
System.out.println("Un-verify resource: " + resource);
- strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName)
- .setZkClient(_gZkClient).setResources(Sets.newHashSet(resource)).build();
+ strictMatchVerifier =
+ new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+ .setResources(Sets.newHashSet(resource)).build();
Assert.assertFalse(strictMatchVerifier.verify(3000));
}
// Full-Auto still match, because preference list wouldn't contain non-live instances
- strictMatchVerifier = new StrictMatchExternalViewVerifier.Builder(_clusterName)
- .setZkClient(_gZkClient).setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build();
+ strictMatchVerifier =
+ new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+ .setResources(Sets.newHashSet(FULL_AUTO_RESOURCES)).build();
Assert.assertTrue(strictMatchVerifier.verify(10000));
}