You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/11/07 19:29:38 UTC
[helix] branch wagedRebalancer updated: Introduce Dry-run Waged
Rebalancer for the verifiers and tests. (#573)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new df661d3 Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)
df661d3 is described below
commit df661d34614cd7e03cd375426fa1ae2a9cccf0db
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Nov 7 11:29:30 2019 -0800
Introduce Dry-run Waged Rebalancer for the verifiers and tests. (#573)
Use a dry-run rebalancer to avoid updating the persisted rebalancer status in the verifiers or tests.
Also, refine several rebalancer related interfaces so as to simplify the dry-run rebalancer implementation.
Convert the test cases back to use the BestPossibleExternalViewVerifier.
Additional fixing:
- Updating the rebalancer preference for every rebalancer.compute calls. Since the preference might be updated at runtime.
- Fix one minor metric domain name bug in the WagedRebalancerMetricCollector.
- Minor test case fix to make them more stable after the change.
---
.../rebalancer/waged/AssignmentMetadataStore.java | 16 +--
.../rebalancer/waged/WagedRebalancer.java | 157 +++++++++++++--------
.../stages/BestPossibleStateCalcStage.java | 71 +++++-----
.../helix/monitoring/metrics/MetricCollector.java | 3 +-
.../metrics/WagedRebalancerMetricCollector.java | 15 +-
.../BestPossibleExternalViewVerifier.java | 82 +++++++++--
.../rebalancer/waged/TestWagedRebalancer.java | 42 +++---
.../waged/TestWagedRebalancerMetrics.java | 2 +-
.../TestDelayedAutoRebalance.java | 8 +-
.../TestDelayedAutoRebalanceWithRackaware.java | 4 +-
.../PartitionMigration/TestExpandCluster.java | 4 +-
.../TestPartitionMigrationBase.java | 12 +-
.../PartitionMigration/TestWagedExpandCluster.java | 16 +--
.../TestWagedRebalancerMigration.java | 18 +--
.../rebalancer/TestMixedModeAutoRebalance.java | 12 +-
.../rebalancer/TestZeroReplicaAvoidance.java | 18 ++-
.../WagedRebalancer/TestDelayedWagedRebalance.java | 14 --
...tDelayedWagedRebalanceWithDisabledInstance.java | 46 +++---
.../TestDelayedWagedRebalanceWithRackaware.java | 43 +++---
.../TestMixedModeWagedRebalance.java | 9 --
.../{TestNodeSwap.java => TestWagedNodeSwap.java} | 12 +-
.../WagedRebalancer/TestWagedRebalance.java | 62 ++++++++
.../TestWagedRebalanceFaultZone.java | 8 +-
23 files changed, 383 insertions(+), 291 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 234c88c..843d1b6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import com.google.common.annotations.VisibleForTesting;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
@@ -35,6 +34,7 @@ import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;
+
/**
* A placeholder before we have the real assignment metadata store.
*/
@@ -49,14 +49,14 @@ public class AssignmentMetadataStore {
private BucketDataAccessor _dataAccessor;
private String _baselinePath;
private String _bestPossiblePath;
- private Map<String, ResourceAssignment> _globalBaseline;
- private Map<String, ResourceAssignment> _bestPossibleAssignment;
+ protected Map<String, ResourceAssignment> _globalBaseline;
+ protected Map<String, ResourceAssignment> _bestPossibleAssignment;
AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
}
- AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
+ protected AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String clusterName) {
_dataAccessor = bucketDataAccessor;
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, ASSIGNMENT_METADATA_KEY);
@@ -153,8 +153,8 @@ public class AssignmentMetadataStore {
HelixProperty property = new HelixProperty(name);
// Add each resource's assignment as a simple field in one ZNRecord
// Node that don't use Arrays.toString() for the record converting. The deserialize will fail.
- assignmentMap.forEach((resource, assignment) -> property.getRecord().setSimpleField(resource,
- new String(SERIALIZER.serialize(assignment.getRecord()))));
+ assignmentMap.forEach((resource, assignment) -> property.getRecord()
+ .setSimpleField(resource, new String(SERIALIZER.serialize(assignment.getRecord()))));
return property;
}
@@ -167,8 +167,8 @@ public class AssignmentMetadataStore {
Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
// Convert each resource's assignment String into a ResourceAssignment object and put it in a
// map
- property.getRecord().getSimpleFields()
- .forEach((resource, assignmentStr) -> assignmentMap.put(resource,
+ property.getRecord().getSimpleFields().forEach((resource, assignmentStr) -> assignmentMap
+ .put(resource,
new ResourceAssignment((ZNRecord) SERIALIZER.deserialize(assignmentStr.getBytes()))));
return assignmentMap;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 605dcd1..c472e77 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
@@ -70,58 +71,70 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
- HelixConstants.ChangeType.IDEAL_STATE,
- HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+ ImmutableSet
+ .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
+ HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+ // To identify if the preference has been configured or not.
+ private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
+ NOT_CONFIGURED_PREFERENCE = ImmutableMap
+ .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
+ ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
+
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
- private final RebalanceAlgorithm _rebalanceAlgorithm;
- private MetricCollector _metricCollector;
-
- private static AssignmentMetadataStore constructAssignmentStore(HelixManager helixManager) {
- AssignmentMetadataStore assignmentMetadataStore = null;
- if (helixManager != null) {
- String metadataStoreAddrs = helixManager.getMetadataStoreConnectionString();
- String clusterName = helixManager.getClusterName();
- if (metadataStoreAddrs != null && clusterName != null) {
- assignmentMetadataStore = new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
- }
+ private final MetricCollector _metricCollector;
+ private RebalanceAlgorithm _rebalanceAlgorithm;
+ private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
+ NOT_CONFIGURED_PREFERENCE;
+
+ private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
+ String clusterName) {
+ if (metadataStoreAddrs != null && clusterName != null) {
+ return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
- return assignmentMetadataStore;
+ return null;
}
public WagedRebalancer(HelixManager helixManager,
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
- MetricCollector metricCollector) {
- this(constructAssignmentStore(helixManager),
- ConstraintBasedAlgorithmFactory.getInstance(preferences),
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
+ this(helixManager == null ? null
+ : constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
+ helixManager.getClusterName()), ConstraintBasedAlgorithmFactory.getInstance(preference),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
- helixManager, metricCollector);
+ helixManager,
+ // If HelixManager is null, we just pass in null for MetricCollector so that a
+ // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
+ // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
+ // in this case, WagedRebalancer will not read/write to metadata store and just use
+ // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
+ // verifying whether the cluster has converged.
+ helixManager == null ? null
+ : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
+ _preference = ImmutableMap.copyOf(preference);
}
/**
* This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
- * the rebalancer will rebalance solely based on CurrentStates. With null MetricCollector, the
+ * the rebalancer will not schedule for a future delayed rebalance. With null MetricCollector, the
* rebalancer will not emit JMX metrics.
* @param assignmentMetadataStore
* @param algorithm
- * @param mappingCalculator
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
- RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
- this(assignmentMetadataStore, algorithm, mappingCalculator, null, null);
+ RebalanceAlgorithm algorithm) {
+ this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null, null);
}
/**
- * This constructor will use null for HelixManager and MetricCollector. With null HelixManager,
- * the rebalancer will rebalance solely based on CurrentStates.
+ * This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
+ * not schedule for a future delayed rebalance.
* @param assignmentMetadataStore
* @param algorithm
* @param metricCollector
@@ -149,11 +162,25 @@ public class WagedRebalancer {
_changeDetector = new ResourceChangeDetector(true);
}
+ // Update the rebalancer preference configuration if the new preference is different from the
+ // current preference configuration.
+ public void updatePreference(
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
+ if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || _preference.equals(newPreference)) {
+ // 1. if the preference was not configured during constructing, no need to update.
+ // 2. if the preference equals to the new preference, no need to update.
+ return;
+ }
+ _rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
+ _preference = ImmutableMap.copyOf(newPreference);
+ }
+
// Release all the resources.
public void close() {
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
+ _metricCollector.unregister();
}
/**
@@ -231,9 +258,43 @@ public class WagedRebalancer {
}
// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
- protected Map<String, IdealState> computeBestPossibleStates(
+ private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
+ final CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
+ Set<String> activeNodes = DelayedRebalanceUtil
+ .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
+ clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
+ clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
+
+ // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
+ delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
+
+ Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
+ computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput));
+
+ // The additional rebalance overwrite is required since the calculated mapping may contain
+ // some delayed rebalanced assignments.
+ if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+ applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet()));
+ }
+ // Replace the assignment if user-defined preference list is configured.
+ // Note the user-defined list is intentionally applied to the final mapping after calculation.
+ // This is to avoid persisting it into the assignment store, which impacts the long term
+ // assignment evenness and partition movements.
+ newIdealStates.entrySet().stream().forEach(idealStateEntry -> applyUserDefinedPreferenceList(
+ clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
+
+ return newIdealStates;
+ }
+
+ // Coordinate baseline recalculation and partial rebalance according to the cluster changes.
+ protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
// Get all the changed items' information
Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
@@ -257,37 +318,11 @@ public class WagedRebalancer {
refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
}
- Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
- clusterData.getEnabledLiveInstances(), clusterData.getInstanceOfflineTimeMap(),
- clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(),
- clusterData.getClusterConfig());
-
- // Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
- delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
-
// Perform partial rebalance
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, clusterChanges, resourceMap, activeNodes, currentStateOutput);
- Map<String, IdealState> finalIdealStateMap =
- convertResourceAssignment(clusterData, newAssignment);
-
- // The additional rebalance overwrite is required since the calculated mapping may contains
- // some delayed rebalanced assignments.
- if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
- applyRebalanceOverwrite(finalIdealStateMap, clusterData, resourceMap, clusterChanges,
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet()));
- }
- // Replace the assignment if user-defined preference list is configured.
- // Note the user-defined list is intentionally applied to the final mapping after calculation.
- // This is to avoid persisting it into the assignment store, which impacts the long term
- // assignment evenness and partition movements.
- finalIdealStateMap.entrySet().stream()
- .forEach(idealStateEntry -> applyUserDefinedPreferenceList(
- clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
-
- return finalIdealStateMap;
+ return newAssignment;
}
/**
@@ -503,7 +538,7 @@ public class WagedRebalancer {
Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
- || !getClass().getName().equals(is.getRebalancerClassName());
+ || !WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
}).map(Map.Entry::getKey).collect(Collectors.toSet());
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
@@ -554,7 +589,7 @@ public class WagedRebalancer {
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
- private Map<String, ResourceAssignment> getBestPossibleAssignment(
+ protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
@@ -614,18 +649,16 @@ public class WagedRebalancer {
* @param idealStateMap the calculated ideal states.
* @param clusterData the cluster data cache.
* @param resourceMap the rebalanaced resource map.
- * @param clusterChanges the detected cluster changes that triggeres the rebalance.
* @param baseline the baseline assignment
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baseline) throws HelixRebalanceException {
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Note that the calculation used the baseline as the input only. This is for minimizing
// unnecessary partition movement.
Map<String, IdealState> activeIdealStates = convertResourceAssignment(clusterData,
- calculateAssignment(clusterData, clusterChanges, resourceMap, enabledLiveInstances,
+ calculateAssignment(clusterData, Collections.emptyMap(), resourceMap, enabledLiveInstances,
Collections.emptyMap(), baseline));
for (String resourceName : idealStateMap.keySet()) {
// The new calculated ideal state before overwrite
@@ -664,6 +697,10 @@ public class WagedRebalancer {
}
}
+ protected AssignmentMetadataStore getAssignmentMetadataStore() {
+ return _assignmentMetadataStore;
+ }
+
protected MetricCollector getMetricCollector() {
return _metricCollector;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fcf89b5..671604e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
-import javax.management.JMException;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
@@ -43,8 +42,6 @@ import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
-import org.apache.helix.monitoring.metrics.MetricCollector;
-import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -55,6 +52,8 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.ResourceMonitor;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
@@ -67,12 +66,12 @@ import org.slf4j.LoggerFactory;
public class BestPossibleStateCalcStage extends AbstractBaseStage {
private static final Logger logger =
LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
- // Create a ThreadLocal of MetricCollector. Metrics could only be updated by the controller thread
- // only.
- private static final ThreadLocal<MetricCollector> METRIC_COLLECTOR_THREAD_LOCAL =
- new ThreadLocal<>();
- private static final ThreadLocal<WagedRebalancer> WAGED_REBALANCER_THREAD_LOCAL =
- new ThreadLocal<>();
+
+ // Lazy initialize the WAGED rebalancer instance since the BestPossibleStateCalcStage instance was
+ // instantiated without the HelixManager information that is required.
+ // TODO: Initialize the WAGED rebalancer in the BestPossibleStateCalcStage constructor once it is
+ // TODO: updated so as to accept a HelixManager or HelixZkClient information.
+ private WagedRebalancer _wagedRebalancer = null;
@Override
public void process(ClusterEvent event) throws Exception {
@@ -113,6 +112,29 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
});
}
+ // Need to keep a default constructor for backward compatibility
+ public BestPossibleStateCalcStage() {
+ }
+
+ // Construct the BestPossibleStateCalcStage with a given WAGED rebalancer for the callers other
+ // than the controller pipeline. Such as the verifiers and test cases.
+ public BestPossibleStateCalcStage(WagedRebalancer wagedRebalancer) {
+ _wagedRebalancer = wagedRebalancer;
+ }
+
+ private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+ // Create WagedRebalancer instance if it hasn't been already initialized
+ if (_wagedRebalancer == null) {
+ _wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+ } else {
+ // Since the preference can be updated at runtime, try to update the algorithm preference
+ // before returning the rebalancer.
+ _wagedRebalancer.updatePreference(preferences);
+ }
+ return _wagedRebalancer;
+ }
+
private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
CurrentStateOutput currentStateOutput) {
ResourceControllerDataProvider cache =
@@ -261,35 +283,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Map<String, IdealState> newIdealStates = new HashMap<>();
- // Init rebalancer with the rebalance preferences.
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
- cache.getClusterConfig().getGlobalRebalancePreference();
-
- // Create MetricCollector ThreadLocal if it hasn't been already initialized
- if (METRIC_COLLECTOR_THREAD_LOCAL.get() == null) {
- try {
- // If HelixManager is null, we just pass in null for MetricCollector so that a
- // non-functioning WagedRebalancerMetricCollector would be created in WagedRebalancer's
- // constructor. This is to handle two cases: 1. HelixManager is null for non-testing cases -
- // in this case, WagedRebalancer will not read/write to metadata store and just use
- // CurrentState-based rebalancing. 2. Tests that require instrumenting the rebalancer for
- // verifying whether the cluster has converged.
- METRIC_COLLECTOR_THREAD_LOCAL.set(helixManager == null ? null
- : new WagedRebalancerMetricCollector(helixManager.getClusterName()));
- } catch (JMException e) {
- LogUtil.logWarn(logger, _eventId, String.format(
- "MetricCollector instantiation failed! WagedRebalancer will not emit metrics due to JMException %s",
- e));
- }
- }
-
- // Create WagedRebalancer ThreadLocal if it hasn't been already initialized
- if (WAGED_REBALANCER_THREAD_LOCAL.get() == null) {
- WAGED_REBALANCER_THREAD_LOCAL
- .set(new WagedRebalancer(helixManager, preferences, METRIC_COLLECTOR_THREAD_LOCAL.get()));
- }
- WagedRebalancer wagedRebalancer = WAGED_REBALANCER_THREAD_LOCAL.get();
-
+ WagedRebalancer wagedRebalancer =
+ getWagedRebalancer(helixManager, cache.getClusterConfig().getGlobalRebalancePreference());
try {
newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
index 764557a..b08a840 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/MetricCollector.java
@@ -27,7 +27,6 @@ import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.helix.HelixException;
import org.apache.helix.monitoring.metrics.model.Metric;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
@@ -71,7 +70,7 @@ public abstract class MetricCollector extends DynamicMBeanProvider {
@Override
public String getSensorName() {
- return String.format("%s.%s.%s", MonitorDomainNames.Rebalancer.name(), _clusterName,
+ return String.format("%s.%s.%s", _monitorDomainName, _clusterName,
_entityName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index 3dd16ad..df8b60f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.metrics;
import javax.management.JMException;
+import org.apache.helix.HelixException;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
@@ -63,10 +64,17 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
PartialRebalanceCounter
}
- public WagedRebalancerMetricCollector(String clusterName) throws JMException {
+ public WagedRebalancerMetricCollector(String clusterName) {
super(MonitorDomainNames.Rebalancer.name(), clusterName, WAGED_REBALANCER_ENTITY_NAME);
createMetrics();
- register();
+ if (clusterName != null) {
+ try {
+ register();
+ } catch (JMException e) {
+ throw new HelixException("Failed to register MBean for the WagedRebalancerMetricCollector.",
+ e);
+ }
+ }
}
/**
@@ -75,8 +83,7 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
* metrics.
*/
public WagedRebalancerMetricCollector() {
- super(MonitorDomainNames.Rebalancer.name(), null, null);
- createMetrics();
+ this(null);
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 52ced19..4118ccf 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -19,40 +19,48 @@ package org.apache.helix.tools.ClusterVerifiers;
* under the License.
*/
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* verifier that the ExternalViews of given resources (or all resources in the cluster)
* match its best possible mapping states.
@@ -377,8 +385,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
}
runStage(event, new CurrentStateComputationStage());
- // TODO: be caution here, should be handled statelessly.
- runStage(event, new BestPossibleStateCalcStage());
+ // Note the dryrunWagedRebalancer is just for one time usage
+ DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+ cache.getClusterConfig().getGlobalRebalancePreference());
+ try {
+ // TODO: be caution here, should be handled statelessly.
+ runStage(event, new BestPossibleStateCalcStage(dryrunWagedRebalancer));
+ } finally {
+ dryrunWagedRebalancer.close();
+ }
BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
return output;
@@ -399,3 +414,44 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
}
}
+
+/**
+ * A Dryrun WAGED rebalancer that only calculates the assignment based on the cluster status but
+ * never update the rebalancer assignment metadata.
+ * This rebalacer is used in the verifiers or tests.
+ */
+class DryrunWagedRebalancer extends WagedRebalancer {
+ DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+ super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs, clusterName),
+ ConstraintBasedAlgorithmFactory.getInstance(preferences));
+ }
+
+ @Override
+ protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
+ return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+ resourceMap.keySet());
+ }
+}
+
+class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
+ ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+ super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+ }
+
+ @Override
+ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+ // Update the in-memory reference only
+ _globalBaseline = globalBaseline;
+ }
+
+ @Override
+ public void persistBestPossibleAssignment(
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // Update the in-memory reference only
+ _bestPossibleAssignment = bestPossibleAssignment;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 9782645..2250539 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -115,10 +114,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test
- public void testRebalance() throws IOException, HelixRebalanceException {
+ public void testRebalance()
+ throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -140,8 +139,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testPartialRebalance() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -166,8 +164,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testRebalanceWithCurrentState() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// Generate the input for the rebalancer.
ResourceControllerDataProvider clusterData = setupClusterDataCache();
@@ -224,10 +221,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance", expectedExceptions = HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
- public void testNonCompatibleConfiguration() throws IOException, HelixRebalanceException {
+ public void testNonCompatibleConfiguration()
+ throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String nonCompatibleResourceName = _resourceNames.get(0);
@@ -248,8 +245,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
@Test(dependsOnMethods = "testRebalance")
public void testInvalidClusterStatus() throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
String invalidResource = _resourceNames.get(0);
@@ -259,7 +255,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Map<String, Resource> resourceMap = clusterData.getIdealStates().keySet().stream().collect(
Collectors.toMap(resourceName -> resourceName, resourceName -> new Resource(resourceName)));
try {
- rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+ rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
+ clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
@@ -280,8 +277,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
when(metadataStore.getBaseline())
.thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
- WagedRebalancer rebalancer =
- new WagedRebalancer(metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
// The input resource Map shall contain all the valid resources.
@@ -299,10 +295,10 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
@Test(dependsOnMethods = "testRebalance")
- public void testAlgorithmException() throws IOException, HelixRebalanceException {
+ public void testAlgorithmException()
+ throws IOException, HelixRebalanceException {
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
ResourceControllerDataProvider clusterData = setupClusterDataCache();
Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
@@ -320,11 +316,12 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
RebalanceAlgorithm badAlgorithm = Mockito.mock(RebalanceAlgorithm.class);
when(badAlgorithm.calculate(any())).thenThrow(new HelixRebalanceException("Algorithm fails.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE));
- rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm, new DelayedAutoRebalancer());
+ rebalancer = new WagedRebalancer(_metadataStore, badAlgorithm);
// Calculation will fail
try {
- rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+ rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
+ clusterData.getEnabledLiveInstances(), new CurrentStateOutput());
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -350,8 +347,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
// Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
// won't propagate any existing assignment from the cluster model.
_metadataStore.clearMetadataStore();
- WagedRebalancer rebalancer =
- new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm);
// 1. rebalance with baseline calculation done
// Generate the input for the rebalancer.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index 83229a1..7b792a2 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -126,7 +126,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
// Add a field to the cluster config so the cluster config will be marked as changed in the change detector.
clusterData.getClusterConfig().getRecord().setSimpleField("foo", "bar");
- rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
Assert.assertEquals((long) metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
index 7d4965e..afcabdf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -82,7 +83,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier = getClusterVerifier();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
}
@@ -254,10 +256,6 @@ public class TestDelayedAutoRebalance extends ZkTestBase {
}
}
- protected ZkHelixClusterVerifier getClusterVerifier() {
- return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
- }
-
// create test DBs, wait it converged and return externalviews
protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
index f85f07f..d8840f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.rebalancer.DelayedAutoRebalancer;
*/
import java.util.Date;
+
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -58,7 +59,8 @@ public class TestDelayedAutoRebalanceWithRackaware extends TestDelayedAutoRebala
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier = getClusterVerifier();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 83893c6..7ece391 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -30,14 +30,14 @@ import org.testng.annotations.Test;
public class TestExpandCluster extends TestPartitionMigrationBase {
-
Map<String, IdealState> _resourceMap;
-
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
_resourceMap = createTestDBs(1000000);
+ // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+ Thread.sleep(1000);
_migrationVerifier = new MigrationStateVerifier(_resourceMap, _manager);
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
index 61d72a2..168cb6c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -24,6 +24,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -84,20 +85,17 @@ public class TestPartitionMigrationBase extends ZkTestBase {
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier = getVerifier();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
- _manager =
- HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
_manager.connect();
_configAccessor = new ConfigAccessor(_gZkClient);
}
- protected ZkHelixClusterVerifier getVerifier() {
- return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
- }
-
protected MockParticipantManager createAndStartParticipant(String instancename) {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
index 01db9eb..37e76ee 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
@@ -20,27 +20,13 @@ package org.apache.helix.integration.rebalancer.PartitionMigration;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
-public class TestWagedExpandCluster extends TestExpandCluster {
-// TODO check the movements in between
- protected ZkHelixClusterVerifier getVerifier() {
- Set<String> dbNames = new HashSet<>();
- int i = 0;
- for (String stateModel : TestStateModels) {
- dbNames.add("Test-DB-" + i++);
- }
- return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
- .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
- }
+public class TestWagedExpandCluster extends TestExpandCluster {
protected Map<String, IdealState> createTestDBs(long delayTime) {
Map<String, IdealState> idealStateMap = new HashMap<>();
int i = 0;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
index 3bfdc37..52def54 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
@@ -28,7 +28,7 @@ import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -39,7 +39,8 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
ConfigAccessor _configAccessor;
@BeforeClass
- public void beforeClass() throws Exception {
+ public void beforeClass()
+ throws Exception {
super.beforeClass();
_configAccessor = new ConfigAccessor(_gZkClient);
}
@@ -58,7 +59,8 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
// TODO check the movements in between
@Test(dataProvider = "stateModels")
public void testMigrateToWagedRebalancerWhileExpandCluster(String stateModel,
- boolean delayEnabled) throws Exception {
+ boolean delayEnabled)
+ throws Exception {
String db = "Test-DB-" + stateModel;
if (delayEnabled) {
createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
@@ -84,9 +86,7 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
}
Thread.sleep(2000);
ZkHelixClusterVerifier clusterVerifier =
- new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
- .setResources(Collections.singleton(db)).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).build();
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(clusterVerifier.verifyByPolling());
_migrationVerifier =
@@ -95,9 +95,11 @@ public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
_migrationVerifier.reset();
_migrationVerifier.start();
- IdealState currentIdealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ IdealState currentIdealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
- _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
+ _gSetupTool.getClusterManagementTool()
+ .setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
Thread.sleep(2000);
Assert.assertTrue(clusterVerifier.verifyByPolling());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index 2f13d8b..f4c875f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -90,7 +90,8 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
- _clusterVerifier = getClusterVerifier();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
@@ -110,10 +111,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
};
}
- protected ZkHelixClusterVerifier getClusterVerifier() {
- return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
- }
-
protected void createResource(String stateModel, int numPartition, int replica,
boolean delayEnabled, String rebalanceStrategy) {
if (delayEnabled) {
@@ -148,6 +145,9 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
new ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
_configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
+ // TODO remove this sleep after fix https://github.com/apache/helix/issues/526
+ Thread.sleep(500);
+
Assert.assertTrue(_clusterVerifier.verify(3000));
verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, userDefinedPartitions);
@@ -263,7 +263,7 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
@AfterMethod
public void afterMethod() {
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
- getClusterVerifier().verify(5000);
+ _clusterVerifier.verify(5000);
}
@AfterClass
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index 58d0dce..bd3f2e1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -40,7 +40,6 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -55,6 +54,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
private List<MockParticipantManager> _participants = new ArrayList<>();
+ private ZkHelixClusterVerifier _clusterVerifier;
private boolean _testSuccess = true;
private boolean _startListen = false;
@@ -76,6 +76,9 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
+
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
}
@AfterMethod
@@ -122,9 +125,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica,
0);
}
- ZkHelixClusterVerifier clusterVerifier =
- new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
- Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+ Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
_startListen = true;
DelayedTransition.setDelay(5);
@@ -133,7 +134,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
for (; i < NUM_NODE; i++) {
_participants.get(i).syncStart();
}
- Assert.assertTrue(clusterVerifier.verify(70000L));
+ Assert.assertTrue(_clusterVerifier.verify(70000L));
Assert.assertTrue(_testSuccess);
if (manager.isConnected()) {
@@ -164,10 +165,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
String db = "Test-DB-" + stateModel;
createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, partition, replica, replica);
}
- ZkHelixClusterVerifier clusterVerifier =
- new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).build();
- Assert.assertTrue(clusterVerifier.verifyByPolling(50000L, 100L));
+ Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
_startListen = true;
DelayedTransition.setDelay(5);
@@ -176,7 +174,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
for (; i < NUM_NODE; i++) {
_participants.get(i).syncStart();
}
- Assert.assertTrue(clusterVerifier.verify(70000L));
+ Assert.assertTrue(_clusterVerifier.verify(70000L));
Assert.assertTrue(_testSuccess);
if (manager.isConnected()) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
index e49cc19..e75da84 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java
@@ -20,15 +20,11 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalance;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -36,16 +32,6 @@ import org.testng.annotations.Test;
* Inherit TestDelayedAutoRebalance to ensure the test logic is the same.
*/
public class TestDelayedWagedRebalance extends TestDelayedAutoRebalance {
- protected ZkHelixClusterVerifier getClusterVerifier() {
- Set<String> dbNames = new HashSet<>();
- int i = 0;
- for (String stateModel : TestStateModels) {
- dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
- }
- return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
- .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
- }
-
// create test DBs, wait it converged and return externalviews
protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
Map<String, ExternalView> externalViews = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
index 3d4bd6a..92988c4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java
@@ -20,35 +20,22 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithDisabledInstance;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
+
/**
* Inherit TestDelayedAutoRebalanceWithDisabledInstance to ensure the test logic is the same.
*/
-public class TestDelayedWagedRebalanceWithDisabledInstance
- extends TestDelayedAutoRebalanceWithDisabledInstance {
- protected ZkHelixClusterVerifier getClusterVerifier() {
- Set<String> dbNames = new HashSet<>();
- int i = 0;
- for (String stateModel : TestStateModels) {
- dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
- }
- return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
- .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
- }
-
+public class TestDelayedWagedRebalanceWithDisabledInstance extends TestDelayedAutoRebalanceWithDisabledInstance {
// create test DBs, wait it converged and return externalviews
- protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+ protected Map<String, ExternalView> createTestDBs(long delayTime)
+ throws InterruptedException {
Map<String, ExternalView> externalViews = new HashMap<>();
int i = 0;
for (String stateModel : TestStateModels) {
@@ -77,28 +64,33 @@ public class TestDelayedWagedRebalanceWithDisabledInstance
// Waged Rebalancer takes cluster level delay config only. Skip this test.
}
- @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
- public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+ @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+ public void testDelayedPartitionMovementWithClusterConfigedDelay()
+ throws Exception {
super.testDelayedPartitionMovementWithClusterConfigedDelay();
}
- @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
- public void testMinimalActiveReplicaMaintain() throws Exception {
+ @Test(dependsOnMethods = {"testDelayedPartitionMovementWithClusterConfigedDelay"})
+ public void testMinimalActiveReplicaMaintain()
+ throws Exception {
super.testMinimalActiveReplicaMaintain();
}
- @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
- public void testPartitionMovementAfterDelayTime() throws Exception {
+ @Test(dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+ public void testPartitionMovementAfterDelayTime()
+ throws Exception {
super.testPartitionMovementAfterDelayTime();
}
- @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
- public void testDisableDelayRebalanceInCluster() throws Exception {
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+ public void testDisableDelayRebalanceInCluster()
+ throws Exception {
super.testDisableDelayRebalanceInCluster();
}
- @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
- public void testDisableDelayRebalanceInInstance() throws Exception {
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+ public void testDisableDelayRebalanceInInstance()
+ throws Exception {
super.testDisableDelayRebalanceInInstance();
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
index bb7c11a..cd1f337 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithRackaware.java
@@ -20,34 +20,22 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
*/
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.rebalancer.DelayedAutoRebalancer.TestDelayedAutoRebalanceWithRackaware;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
+
/**
* Inherit TestDelayedAutoRebalanceWithRackaware to ensure the test logic is the same.
*/
public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebalanceWithRackaware {
- protected ZkHelixClusterVerifier getClusterVerifier() {
- Set<String> dbNames = new HashSet<>();
- int i = 0;
- for (String stateModel : TestStateModels) {
- dbNames.add("Test-DB-" + TestHelper.getTestMethodName() + i++);
- }
- return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
- .setDeactivatedNodeAwareness(true).setZkAddr(ZK_ADDR).build();
- }
-
// create test DBs, wait it converged and return externalviews
- protected Map<String, ExternalView> createTestDBs(long delayTime) throws InterruptedException {
+ protected Map<String, ExternalView> createTestDBs(long delayTime)
+ throws InterruptedException {
Map<String, ExternalView> externalViews = new HashMap<>();
int i = 0;
for (String stateModel : TestStateModels) {
@@ -76,28 +64,33 @@ public class TestDelayedWagedRebalanceWithRackaware extends TestDelayedAutoRebal
// Waged Rebalancer takes cluster level delay config only. Skip this test.
}
- @Test(dependsOnMethods = { "testDelayedPartitionMovement" })
- public void testDelayedPartitionMovementWithClusterConfigedDelay() throws Exception {
+ @Test(dependsOnMethods = {"testDelayedPartitionMovement"})
+ public void testDelayedPartitionMovementWithClusterConfigedDelay()
+ throws Exception {
super.testDelayedPartitionMovementWithClusterConfigedDelay();
}
- @Test(dependsOnMethods = { "testDelayedPartitionMovementWithClusterConfigedDelay" })
- public void testMinimalActiveReplicaMaintain() throws Exception {
+ @Test(dependsOnMethods = {"testDelayedPartitionMovementWithClusterConfigedDelay"})
+ public void testMinimalActiveReplicaMaintain()
+ throws Exception {
super.testMinimalActiveReplicaMaintain();
}
- @Test(dependsOnMethods = { "testMinimalActiveReplicaMaintain" })
- public void testPartitionMovementAfterDelayTime() throws Exception {
+ @Test(dependsOnMethods = {"testMinimalActiveReplicaMaintain"})
+ public void testPartitionMovementAfterDelayTime()
+ throws Exception {
super.testPartitionMovementAfterDelayTime();
}
- @Test(dependsOnMethods = { "testDisableDelayRebalanceInResource" })
- public void testDisableDelayRebalanceInCluster() throws Exception {
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInResource"})
+ public void testDisableDelayRebalanceInCluster()
+ throws Exception {
super.testDisableDelayRebalanceInCluster();
}
- @Test(dependsOnMethods = { "testDisableDelayRebalanceInCluster" })
- public void testDisableDelayRebalanceInInstance() throws Exception {
+ @Test(dependsOnMethods = {"testDisableDelayRebalanceInCluster"})
+ public void testDisableDelayRebalanceInInstance()
+ throws Exception {
super.testDisableDelayRebalanceInInstance();
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
index 00a31e2..eb9e0f8 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -19,12 +19,8 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
* under the License.
*/
-import java.util.Collections;
-
import org.apache.helix.integration.rebalancer.TestMixedModeAutoRebalance;
import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
@@ -43,11 +39,6 @@ public class TestMixedModeWagedRebalance extends TestMixedModeAutoRebalance {
};
}
- protected ZkHelixClusterVerifier getClusterVerifier() {
- return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(Collections.singleton(DB_NAME)).build();
- }
-
protected void createResource(String stateModel, int numPartition,
int replica, boolean delayEnabled, String rebalanceStrategy) {
if (delayEnabled) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
similarity index 95%
rename from helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
rename to helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index a87be34..369d46a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -36,14 +36,14 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestNodeSwap extends ZkTestBase {
+public class TestWagedNodeSwap extends ZkTestBase {
final int NUM_NODE = 6;
protected static final int START_PORT = 12918;
protected static final int _PARTITIONS = 20;
@@ -122,8 +122,8 @@ public class TestNodeSwap extends ZkTestBase {
}
Thread.sleep(1000);
- _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+ _clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(_clusterVerifier.verify(5000));
}
@@ -183,8 +183,6 @@ public class TestNodeSwap extends ZkTestBase {
.manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
Thread.sleep(2000);
- _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
Assert.assertTrue(_clusterVerifier.verify(5000));
// Since only one node temporary down, the same partitions will be moved to the newly added node.
@@ -271,8 +269,6 @@ public class TestNodeSwap extends ZkTestBase {
.manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
Thread.sleep(2000);
- _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
Assert.assertTrue(_clusterVerifier.verify(5000));
for (String db : _allDBs) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 9790b92..11214ce 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
@@ -410,6 +411,67 @@ public class TestWagedRebalance extends ZkTestBase {
}
}
+ @Test(dependsOnMethods = "test")
+ public void testNewInstances()
+ throws InterruptedException {
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setGlobalRebalancePreference(ImmutableMap.of(
+ ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 0, ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 10));
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, _replica, _replica);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+ validate(_replica);
+
+ String newNodeName = "newNode-" + TestHelper.getTestMethodName() + "_" + START_PORT;
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+ try {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newNodeName);
+ participant.syncStart();
+
+ Thread.sleep(300);
+ validate(_replica);
+
+ Assert.assertFalse(_allDBs.stream().anyMatch(db -> {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ for (String partition : ev.getPartitionSet()) {
+ if (ev.getStateMap(partition).containsKey(newNodeName)) {
+ return true;
+ }
+ }
+ return false;
+ }));
+
+ clusterConfig.setGlobalRebalancePreference(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ Thread.sleep(300);
+ validate(_replica);
+
+ Assert.assertTrue(_allDBs.stream().anyMatch(db -> {
+ ExternalView ev =
+ _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ for (String partition : ev.getPartitionSet()) {
+ if (ev.getStateMap(partition).containsKey(newNodeName)) {
+ return true;
+ }
+ }
+ return false;
+ }));
+ } finally {
+ if (participant != null && participant.isConnected()) {
+ participant.syncStop();
+ }
+ }
+ }
+
private void validate(int expectedReplica) {
HelixClusterVerifier _clusterVerifier =
new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
index 831f77f..904e0bc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -36,7 +36,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -304,8 +304,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
private void validate(int expectedReplica) {
ZkHelixClusterVerifier _clusterVerifier =
- new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (String db : _allDBs) {
@@ -350,8 +349,7 @@ public class TestWagedRebalanceFaultZone extends ZkTestBase {
// waiting for all DB be dropped.
Thread.sleep(100);
ZkHelixClusterVerifier _clusterVerifier =
- new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
- .setDeactivatedNodeAwareness(true).setResources(_allDBs).build();
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}