You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:06 UTC
[helix] 03/37: Implement the WAGED rebalancer cluster model (#362)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 730054b90f9ac73e06168201728c89c57b69cc58
Author: jiajunwang <18...@users.noreply.github.com>
AuthorDate: Fri Aug 2 21:21:49 2019 -0700
Implement the WAGED rebalancer cluster model (#362)
* Introduce the cluster model classes to support the WAGED rebalancer.
Implement the cluster model classes with the minimum necessary information to support rebalance.
Additional field/logics might be added later once the detailed rebalance logic is implemented.
Also add related tests.
---
.../rebalancer/waged/ClusterDataProvider.java | 2 +-
.../rebalancer/waged/model/AssignableNode.java | 291 ++++++++++++++++++++-
.../rebalancer/waged/model/AssignableReplica.java | 118 ++++++++-
.../rebalancer/waged/model/ClusterContext.java | 99 ++++++-
.../rebalancer/waged/model/ClusterModel.java | 132 +++++++++-
.../apache/helix/model/StateModelDefinition.java | 4 +-
.../waged/model/AbstractTestClusterModel.java | 176 +++++++++++++
.../rebalancer/waged/model/TestAssignableNode.java | 203 ++++++++++++++
.../waged/model/TestAssignableReplica.java | 99 +++++++
.../rebalancer/waged/model/TestClusterContext.java | 90 +++++++
.../rebalancer/waged/model/TestClusterModel.java | 114 ++++++++
11 files changed, 1311 insertions(+), 17 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
index 419be42..feae1dc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
@@ -48,6 +48,6 @@ public class ClusterDataProvider {
Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
Map<String, IdealState> baselineAssignment, Map<String, IdealState> bestPossibleAssignment) {
// TODO finish the implementation.
- return new ClusterModel();
+ return null;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index ae037f4..989323e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,10 +19,291 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.max;
+
/**
- * A placeholder before we have the implementation.
- *
- * This class represents a potential allocation of the replication.
- * Note that AssignableNode is not thread safe.
+ * This class represents a possible allocation of the replication.
+ * Note that any usage updates to the AssignableNode are not thread safe.
*/
-public class AssignableNode { }
+public class AssignableNode {
+ private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
+
+ // basic node information
+ private final String _instanceName;
+ private Set<String> _instanceTags;
+ private String _faultZone;
+ private Map<String, List<String>> _disabledPartitionsMap;
+ private Map<String, Integer> _maxCapacity;
+ private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
+
+ // proposed assignment tracking
+ // <resource name, partition name>
+ private Map<String, Set<String>> _currentAssignments;
+ // <resource name, top state partition name>
+ private Map<String, Set<String>> _currentTopStateAssignments;
+ // <capacity key, capacity value>
+ private Map<String, Integer> _currentCapacity;
+ // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
+ private float _highestCapacityUtilization;
+
+ AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
+ Collection<AssignableReplica> existingAssignment) {
+ _instanceName = instanceName;
+ refresh(clusterConfig, instanceConfig, existingAssignment);
+ }
+
+ private void reset() {
+ _currentAssignments = new HashMap<>();
+ _currentTopStateAssignments = new HashMap<>();
+ _currentCapacity = new HashMap<>();
+ _highestCapacityUtilization = 0;
+ }
+
+ /**
+ * Update the node with a ClusterDataCache. This resets the current assignment and recalculates currentCapacity.
+ * NOTE: While this is required to be used in the constructor, this can also be used when the clusterCache needs to be
+ * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and ResourceConfig could
+ * subject to change. If the assumption is no longer true, this function should become private.
+ *
+ * @param clusterConfig - the Cluster Config of the cluster where the node is located
+ * @param instanceConfig - the Instance Config of the node
+ * @param existingAssignment - all the existing replicas that are current assigned to the node
+ */
+ private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
+ Collection<AssignableReplica> existingAssignment) {
+ reset();
+
+ _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+ _faultZone = computeFaultZone(clusterConfig, instanceConfig);
+ _instanceTags = new HashSet<>(instanceConfig.getTags());
+ _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
+ _maxCapacity = instanceConfig.getInstanceCapacityMap();
+ _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+
+ assignNewBatch(existingAssignment);
+ }
+
+ /**
+ * Assign a replica to the node.
+ *
+ * @param assignableReplica - the replica to be assigned
+ */
+ void assign(AssignableReplica assignableReplica) {
+ if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
+ throw new HelixException(String
+ .format("Resource %s already has a replica from partition %s on node %s",
+ assignableReplica.getResourceName(), assignableReplica.getPartitionName(),
+ getInstanceName()));
+ } else {
+ if (assignableReplica.isReplicaTopState()) {
+ addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
+ }
+ assignableReplica.getCapacity().entrySet().stream().forEach(
+ capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+ }
+ }
+
+ /**
+ * Release a replica from the node.
+ * If the replication is not on this node, the assignable node is not updated.
+ *
+ * @param assignableReplica - the replica to be released
+ */
+ void release(AssignableReplica assignableReplica) throws IllegalArgumentException {
+ String resourceName = assignableReplica.getResourceName();
+ String partitionName = assignableReplica.getPartitionName();
+
+ // Check if the release is necessary
+ if (!_currentAssignments.containsKey(resourceName)) {
+ LOG.warn("Resource {} is not on node {}. Ignore the release call.", resourceName,
+ getInstanceName());
+ return;
+ }
+ Set<String> partitions = _currentAssignments.get(resourceName);
+ if (!partitions.contains(partitionName)) {
+ LOG.warn(String
+ .format("Resource %s does not have a replica from partition %s on node %s", resourceName,
+ partitionName, getInstanceName()));
+ return;
+ }
+
+ partitions.remove(assignableReplica.getPartitionName());
+ if (assignableReplica.isReplicaTopState()) {
+ _currentTopStateAssignments.get(resourceName).remove(partitionName);
+ }
+ // Recalculate utilization because of release
+ _highestCapacityUtilization = 0;
+ assignableReplica.getCapacity().entrySet().stream()
+ .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * entry.getValue()));
+ }
+
+ public Map<String, Set<String>> getCurrentAssignmentsMap() {
+ return _currentAssignments;
+ }
+
+ public Set<String> getCurrentAssignmentsByResource(String resource) {
+ return _currentAssignments.getOrDefault(resource, Collections.emptySet());
+ }
+
+ public Set<String> getCurrentTopStateAssignmentsByResource(String resource) {
+ return _currentTopStateAssignments.getOrDefault(resource, Collections.emptySet());
+ }
+
+ public int getTopStateAssignmentTotalSize() {
+ return _currentTopStateAssignments.values().stream().mapToInt(Set::size).sum();
+ }
+
+ public int getCurrentAssignmentCount() {
+ return _currentAssignments.values().stream().mapToInt(Set::size).sum();
+ }
+
+ public Map<String, Integer> getCurrentCapacity() {
+ return _currentCapacity;
+ }
+
+ public float getHighestCapacityUtilization() {
+ return _highestCapacityUtilization;
+ }
+
+ public String getInstanceName() {
+ return _instanceName;
+ }
+
+ public Set<String> getInstanceTags() {
+ return _instanceTags;
+ }
+
+ public String getFaultZone() {
+ return _faultZone;
+ }
+
+ public Map<String, List<String>> getDisabledPartitionsMap() {
+ return _disabledPartitionsMap;
+ }
+
+ public Map<String, Integer> getMaxCapacity() {
+ return _maxCapacity;
+ }
+
+ public int getMaxPartition() {
+ return _maxPartition;
+ }
+
+ /**
+ * Computes the fault zone id based on the domain and fault zone type when topology is enabled. For example, when
+ * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function returns "2".
+ * If cannot find the fault zone id, this function leaves the fault zone id as the instance name.
+ * TODO merge this logic with Topology.java tree building logic.
+ * For now, the WAGED rebalancer has a more strict topology def requirement.
+ * Any missing field will cause an invalid topology config exception.
+ */
+ private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
+ if (clusterConfig.isTopologyAwareEnabled()) {
+ String topologyStr = clusterConfig.getTopology();
+ String faultZoneType = clusterConfig.getFaultZoneType();
+ if (topologyStr == null || faultZoneType == null) {
+ throw new HelixException("Fault zone or cluster topology information is not configured.");
+ }
+
+ String[] topologyDef = topologyStr.trim().split("/");
+ if (topologyDef.length == 0 || Arrays.stream(topologyDef)
+ .noneMatch(type -> type.equals(faultZoneType))) {
+ throw new HelixException(
+ "The configured topology definition is empty or does not contain the fault zone type.");
+ }
+
+ Map<String, String> domainAsMap = instanceConfig.getDomainAsMap();
+ if (domainAsMap == null) {
+ throw new HelixException(
+ String.format("The domain configuration of node %s is not configured", _instanceName));
+ } else {
+ StringBuilder faultZoneStringBuilder = new StringBuilder();
+ for (String key : topologyDef) {
+ if (!key.isEmpty()) {
+ if (domainAsMap.containsKey(key)) {
+ faultZoneStringBuilder.append(domainAsMap.get(key));
+ faultZoneStringBuilder.append('/');
+ } else {
+ throw new HelixException(String.format(
+ "The domain configuration of node %s is not complete. Type %s is not found.",
+ _instanceName, key));
+ }
+ if (key.equals(faultZoneType)) {
+ break;
+ }
+ }
+ }
+ return faultZoneStringBuilder.toString();
+ }
+ } else {
+ // For backward compatibility
+ String zoneId = instanceConfig.getZoneId();
+ return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
+ }
+ }
+
+ /**
+ * This function should only be used to assign a set of new partitions that are not allocated on this node.
+ * Using this function avoids the overhead of updating capacity repeatedly.
+ */
+ private void assignNewBatch(Collection<AssignableReplica> replicas) {
+ Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+ for (AssignableReplica replica : replicas) {
+ addToAssignmentRecord(replica, _currentAssignments);
+ if (replica.isReplicaTopState()) {
+ addToAssignmentRecord(replica, _currentTopStateAssignments);
+ }
+ // increment the capacity requirement according to partition's capacity configuration.
+ for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+ totalPartitionCapacity.compute(capacity.getKey(),
+ (key, totalValue) -> (totalValue == null) ?
+ capacity.getValue() :
+ totalValue + capacity.getValue());
+ }
+ }
+
+ // Update the global state after all single replications' calculation is done.
+ for (String key : totalPartitionCapacity.keySet()) {
+ updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+ }
+ }
+
+ private boolean addToAssignmentRecord(AssignableReplica replica,
+ Map<String, Set<String>> currentAssignments) {
+ return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> new HashSet<>())
+ .add(replica.getPartitionName());
+ }
+
+ private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
+ if (_currentCapacity.containsKey(capacityKey)) {
+ int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract;
+ _currentCapacity.put(capacityKey, newCapacity);
+ // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
+ float utilization = Math.min(
+ (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
+ _highestCapacityUtilization = max(_highestCapacityUtilization, utilization);
+ }
+ // else if the capacityKey does not exist in the capacity map, this method essentially becomes
+ // a NOP; in other words, this node will be treated as if it has unlimited capacity.
+ }
+
+ @Override
+ public int hashCode() {
+ return _instanceName.hashCode();
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index a6a7e4a..0082a2d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,9 +19,121 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.io.IOException;
+import java.util.Map;
+
/**
- * A placeholder before we have the implementation.
- *
* This class represents a partition replication that needs to be allocated.
*/
-public class AssignableReplica { }
+public class AssignableReplica implements Comparable<AssignableReplica> {
+ private final String _partitionName;
+ private final String _resourceName;
+ private final String _resourceInstanceGroupTag;
+ private final int _resourceMaxPartitionsPerInstance;
+ private final Map<String, Integer> _capacityUsage;
+ // The priority of the replica's state
+ private final int _statePriority;
+ // The state of the replica
+ private final String _replicaState;
+
+ /**
+ * @param resourceConfig The resource config for the resource which contains the replication.
+ * @param partitionName The replication's partition name.
+ * @param replicaState The state of the replication.
+ * @param statePriority The priority of the replication's state.
+ */
+ AssignableReplica(ResourceConfig resourceConfig, String partitionName, String replicaState,
+ int statePriority) {
+ _partitionName = partitionName;
+ _replicaState = replicaState;
+ _statePriority = statePriority;
+ _resourceName = resourceConfig.getResourceName();
+ _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig);
+ _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
+ _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
+ }
+
+ public Map<String, Integer> getCapacity() {
+ return _capacityUsage;
+ }
+
+ public String getPartitionName() {
+ return _partitionName;
+ }
+
+ public String getReplicaState() {
+ return _replicaState;
+ }
+
+ public boolean isReplicaTopState() {
+ return _statePriority == StateModelDefinition.TOP_STATE_PRIORITY;
+ }
+
+ public int getStatePriority() {
+ return _statePriority;
+ }
+
+ public String getResourceName() {
+ return _resourceName;
+ }
+
+ public String getResourceInstanceGroupTag() {
+ return _resourceInstanceGroupTag;
+ }
+
+ public int getResourceMaxPartitionsPerInstance() {
+ return _resourceMaxPartitionsPerInstance;
+ }
+
+ @Override
+ public String toString() {
+ return generateReplicaKey(_resourceName, _partitionName, _replicaState);
+ }
+
+ @Override
+ public int compareTo(AssignableReplica replica) {
+ if (!_resourceName.equals(replica._resourceName)) {
+ return _resourceName.compareTo(replica._resourceName);
+ }
+ if (!_partitionName.equals(replica._partitionName)) {
+ return _partitionName.compareTo(replica._partitionName);
+ }
+ if (!_replicaState.equals(replica._replicaState)) {
+ return _replicaState.compareTo(replica._replicaState);
+ }
+ return 0;
+ }
+
+ public static String generateReplicaKey(String resourceName, String partitionName, String state) {
+ return String.format("%s-%s-%s", resourceName, partitionName, state);
+ }
+
+ /**
+ * Parse the resource config for the partition weight.
+ */
+ private Map<String, Integer> fetchCapacityUsage(String partitionName,
+ ResourceConfig resourceConfig) {
+ Map<String, Map<String, Integer>> capacityMap;
+ try {
+ capacityMap = resourceConfig.getPartitionCapacityMap();
+ } catch (IOException ex) {
+ throw new IllegalArgumentException(
+ "Invalid partition capacity configuration of resource: " + resourceConfig
+ .getResourceName(), ex);
+ }
+
+ Map<String, Integer> partitionCapacity = capacityMap.get(partitionName);
+ if (partitionCapacity == null) {
+ partitionCapacity = capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
+ }
+ if (partitionCapacity == null) {
+ throw new IllegalArgumentException(String.format(
+ "The capacity usage of the specified partition %s is not configured in the Resource Config %s. No default partition capacity is configured neither.",
+ partitionName, resourceConfig.getResourceName()));
+ }
+ return partitionCapacity;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index adca7d1..c163e4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -19,9 +19,100 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.HelixException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
- * A placeholder before we have the implementation.
- *
- * This class tracks the global rebalance-related status of a Helix managed cluster.
+ * This class tracks the rebalance-related global cluster status.
*/
-public class ClusterContext { }
+public class ClusterContext {
+ private final static float ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT = 1.1f;
+
+ // This estimation helps to ensure global partition count evenness
+ private final int _estimatedMaxPartitionCount;
+ // This estimation helps to ensure global top state replica count evenness
+ private final int _estimatedMaxTopStateCount;
+ // This estimation helps to ensure per-resource partition count evenness
+ private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
+
+ // map{zoneName : map{resourceName : set(partitionNames)}}
+ private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
+
+ /**
+ * Construct the cluster context based on the current instance status.
+ *
+ * @param replicaSet All the partition replicas that are managed by the rebalancer
+ * @param instanceCount The count of all the active instances that can be used to host partitions.
+ */
+ ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
+ int totalReplicas = 0;
+ int totalTopStateReplicas = 0;
+
+ for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
+ .collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) {
+ int replicas = entry.getValue().size();
+ totalReplicas += replicas;
+
+ int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount));
+ _estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);
+
+ totalTopStateReplicas +=
+ entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
+ }
+
+ _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
+ _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
+ }
+
+ public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
+ return _assignmentForFaultZoneMap;
+ }
+
+ public int getEstimatedMaxPartitionCount() {
+ return _estimatedMaxPartitionCount;
+ }
+
+ public int getEstimatedMaxPartitionByResource(String resourceName) {
+ return _estimatedMaxPartitionByResource.get(resourceName);
+ }
+
+ public int getEstimatedMaxTopStateCount() {
+ return _estimatedMaxTopStateCount;
+ }
+
+ public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
+ return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
+ .getOrDefault(resourceName, Collections.emptySet());
+ }
+
+ void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
+ if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
+ .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
+ throw new HelixException(String
+ .format("Resource %s already has a replica from partition %s in fault zone %s",
+ resourceName, partition, faultZoneId));
+ }
+ }
+
+ boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) {
+ return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
+ .getOrDefault(resourceName, Collections.emptySet()).remove(partition);
+ }
+
+ void setAssignmentForFaultZoneMap(
+ Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
+ _assignmentForFaultZoneMap = assignmentForFaultZoneMap;
+ }
+
+ private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
+ return (int) Math
+ .ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
index 06eebf7..2908939 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java
@@ -19,9 +19,135 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.HelixException;
+import org.apache.helix.model.IdealState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
- * A placeholder before we have the implementation.
- *
* This class wraps the required input for the rebalance algorithm.
*/
-public class ClusterModel { }
+public class ClusterModel {
+ private final ClusterContext _clusterContext;
+ // Map to track all the assignable replications. <Resource Name, Set<Replicas>>
+ private final Map<String, Set<AssignableReplica>> _assignableReplicaMap;
+ // The index to find the replication information with a certain state. <Resource, <Key(resource_partition_state), Replica>>
+ // Note that the identical replicas are deduped in the index.
+ private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex;
+ private final Map<String, AssignableNode> _assignableNodeMap;
+
+ // Records about the previous assignment
+ // <ResourceName, IdealState contains the baseline assignment>
+ private final Map<String, IdealState> _baselineAssignment;
+ // <ResourceName, IdealState contains the best possible assignment>
+ private final Map<String, IdealState> _bestPossibleAssignment;
+
+ /**
+ * @param clusterContext The initialized cluster context.
+ * @param assignableReplicas The replicas to be assigned.
+ * Note that the replicas in this list shall not be included while initializing the context and assignable nodes.
+ * @param assignableNodes The active instances.
+ * @param baselineAssignment The recorded baseline assignment.
+ * @param bestPossibleAssignment The current best possible assignment.
+ */
+ ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas,
+ Set<AssignableNode> assignableNodes, Map<String, IdealState> baselineAssignment,
+ Map<String, IdealState> bestPossibleAssignment) {
+ _clusterContext = clusterContext;
+
+ // Save all the to be assigned replication
+ _assignableReplicaMap = assignableReplicas.stream()
+ .collect(Collectors.groupingBy(AssignableReplica::getResourceName, Collectors.toSet()));
+
+ // Index all the replicas to be assigned. Dedup the replica if two instances have the same resource/partition/state
+ _assignableReplicaIndex = assignableReplicas.stream().collect(Collectors
+ .groupingBy(AssignableReplica::getResourceName, Collectors
+ .toMap(AssignableReplica::toString, replica -> replica,
+ (oldValue, newValue) -> oldValue)));
+
+ _assignableNodeMap = assignableNodes.stream()
+ .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node));
+
+ _baselineAssignment = baselineAssignment;
+ _bestPossibleAssignment = bestPossibleAssignment;
+ }
+
+ public ClusterContext getContext() {
+ return _clusterContext;
+ }
+
+ public Map<String, AssignableNode> getAssignableNodes() {
+ return _assignableNodeMap;
+ }
+
+ public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
+ return _assignableReplicaMap;
+ }
+
+ public Map<String, IdealState> getBaseline() {
+ return _baselineAssignment;
+ }
+
+ public Map<String, IdealState> getBestPossibleAssignment() {
+ return _bestPossibleAssignment;
+ }
+
+ /**
+ * Assign the given replica to the specified instance and record the assignment in the cluster model.
+ * The cluster usage information will be updated accordingly.
+ *
+ * @param resourceName
+ * @param partitionName
+ * @param state
+ * @param instanceName
+ */
+ public void assign(String resourceName, String partitionName, String state, String instanceName) {
+ AssignableNode node = locateAssignableNode(instanceName);
+ AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state);
+
+ node.assign(replica);
+ _clusterContext.addPartitionToFaultZone(node.getFaultZone(), resourceName, partitionName);
+ }
+
+ /**
+ * Revert the proposed assignment from the cluster model.
+ * The cluster usage information will be updated accordingly.
+ *
+ * @param resourceName
+ * @param partitionName
+ * @param state
+ * @param instanceName
+ */
+ public void release(String resourceName, String partitionName, String state,
+ String instanceName) {
+ AssignableNode node = locateAssignableNode(instanceName);
+ AssignableReplica replica = locateAssignableReplica(resourceName, partitionName, state);
+
+ node.release(replica);
+ _clusterContext.removePartitionFromFaultZone(node.getFaultZone(), resourceName, partitionName);
+ }
+
+ private AssignableNode locateAssignableNode(String instanceName) {
+ AssignableNode node = _assignableNodeMap.get(instanceName);
+ if (node == null) {
+ throw new HelixException("Cannot find the instance: " + instanceName);
+ }
+ return node;
+ }
+
+ private AssignableReplica locateAssignableReplica(String resourceName, String partitionName,
+ String state) {
+ AssignableReplica sampleReplica =
+ _assignableReplicaIndex.getOrDefault(resourceName, Collections.emptyMap())
+ .get(AssignableReplica.generateReplicaKey(resourceName, partitionName, state));
+ if (sampleReplica == null) {
+ throw new HelixException(String
+ .format("Cannot find the replication with resource name %s, partition name %s, state %s.",
+ resourceName, partitionName, state));
+ }
+ return sampleReplica;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index ae59522..0a40331 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -46,6 +46,8 @@ public class StateModelDefinition extends HelixProperty {
STATE_PRIORITY_LIST
}
+ public static final int TOP_STATE_PRIORITY = 1;
+
/**
* state model's initial state
*/
@@ -98,7 +100,7 @@ public class StateModelDefinition extends HelixProperty {
_stateTransitionTable = new HashMap<>();
_statesCountMap = new HashMap<>();
if (_statesPriorityList != null) {
- int priority = 1;
+ int priority = TOP_STATE_PRIORITY;
for (String state : _statesPriorityList) {
Map<String, String> metaData = record.getMapField(state + ".meta");
if (metaData != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
new file mode 100644
index 0000000..0e2b43a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -0,0 +1,176 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfig;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractTestClusterModel {
+ protected String _testInstanceId;
+ protected List<String> _resourceNames;
+ protected List<String> _partitionNames;
+ protected Map<String, Integer> _capacityDataMap;
+ protected Map<String, List<String>> _disabledPartitionsMap;
+ protected List<String> _testInstanceTags;
+ protected String _testFaultZoneId;
+
+ @BeforeClass
+ public void initialize() {
+ _testInstanceId = "testInstanceId";
+ _resourceNames = new ArrayList<>();
+ _resourceNames.add("Resource1");
+ _resourceNames.add("Resource2");
+ _partitionNames = new ArrayList<>();
+ _partitionNames.add("Partition1");
+ _partitionNames.add("Partition2");
+ _partitionNames.add("Partition3");
+ _partitionNames.add("Partition4");
+ _capacityDataMap = new HashMap<>();
+ _capacityDataMap.put("item1", 20);
+ _capacityDataMap.put("item2", 40);
+ _capacityDataMap.put("item3", 30);
+ List<String> disabledPartitions = new ArrayList<>();
+ disabledPartitions.add("TestPartition");
+ _disabledPartitionsMap = new HashMap<>();
+ _disabledPartitionsMap.put("TestResource", disabledPartitions);
+ _testInstanceTags = new ArrayList<>();
+ _testInstanceTags.add("TestTag");
+ _testFaultZoneId = "testZone";
+ }
+
+ protected ResourceControllerDataProvider setupClusterDataCache() throws IOException {
+ ResourceControllerDataProvider testCache = Mockito.mock(ResourceControllerDataProvider.class);
+
+ // 1. Set up the default instance information with capacity configuration.
+ InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceId");
+ testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
+ testInstanceConfig.addTag(_testInstanceTags.get(0));
+ testInstanceConfig.setInstanceEnabledForPartition("TestResource", "TestPartition", false);
+ testInstanceConfig.setInstanceEnabled(true);
+ testInstanceConfig.setZoneId(_testFaultZoneId);
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ // 2. Set up the basic cluster configuration.
+ ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setMaxPartitionsPerInstance(5);
+ testClusterConfig.setDisabledInstances(Collections.emptyMap());
+ testClusterConfig.setTopologyAwareEnabled(false);
+ when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+ // 3. Mock the live instance node for the default instance.
+ LiveInstance testLiveInstance = new LiveInstance(_testInstanceId);
+ testLiveInstance.setSessionId("testSessionId");
+ Map<String, LiveInstance> liveInstanceMap = new HashMap<>();
+ liveInstanceMap.put(_testInstanceId, testLiveInstance);
+ when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
+
+ // 4. Mock two resources, each with 2 partitions on the default instance.
+ // The instance will have the following partitions assigned
+ // Resource 1:
+ // partition 1 - MASTER
+ // partition 2 - SLAVE
+ // Resource 2:
+ // partition 3 - MASTER
+ // partition 4 - SLAVE
+ CurrentState testCurrentStateResource1 = Mockito.mock(CurrentState.class);
+ Map<String, String> partitionStateMap1 = new HashMap<>();
+ partitionStateMap1.put(_partitionNames.get(0), "MASTER");
+ partitionStateMap1.put(_partitionNames.get(1), "SLAVE");
+ when(testCurrentStateResource1.getResourceName()).thenReturn(_resourceNames.get(0));
+ when(testCurrentStateResource1.getPartitionStateMap()).thenReturn(partitionStateMap1);
+ when(testCurrentStateResource1.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(testCurrentStateResource1.getState(_partitionNames.get(0))).thenReturn("MASTER");
+ when(testCurrentStateResource1.getState(_partitionNames.get(1))).thenReturn("SLAVE");
+ CurrentState testCurrentStateResource2 = Mockito.mock(CurrentState.class);
+ Map<String, String> partitionStateMap2 = new HashMap<>();
+ partitionStateMap2.put(_partitionNames.get(2), "MASTER");
+ partitionStateMap2.put(_partitionNames.get(3), "SLAVE");
+ when(testCurrentStateResource2.getResourceName()).thenReturn(_resourceNames.get(1));
+ when(testCurrentStateResource2.getPartitionStateMap()).thenReturn(partitionStateMap2);
+ when(testCurrentStateResource2.getStateModelDefRef()).thenReturn("MasterSlave");
+ when(testCurrentStateResource2.getState(_partitionNames.get(2))).thenReturn("MASTER");
+ when(testCurrentStateResource2.getState(_partitionNames.get(3))).thenReturn("SLAVE");
+ Map<String, CurrentState> currentStatemap = new HashMap<>();
+ currentStatemap.put(_resourceNames.get(0), testCurrentStateResource1);
+ currentStatemap.put(_resourceNames.get(1), testCurrentStateResource2);
+ when(testCache.getCurrentState(_testInstanceId, "testSessionId")).thenReturn(currentStatemap);
+
+ // 5. Set up the resource config for the two resources with the partition weight.
+ Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
+ capacityDataMapResource1.put("item1", 3);
+ capacityDataMapResource1.put("item2", 6);
+ ResourceConfig testResourceConfigResource1 = new ResourceConfig("Resource1");
+ testResourceConfigResource1.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+ when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1);
+ Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
+ capacityDataMapResource2.put("item1", 5);
+ capacityDataMapResource2.put("item2", 10);
+ ResourceConfig testResourceConfigResource2 = new ResourceConfig("Resource2");
+ testResourceConfigResource2.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource2));
+ when(testCache.getResourceConfig("Resource2")).thenReturn(testResourceConfigResource2);
+
+ // 6. Define mock state model
+ for (BuiltInStateModelDefinitions bsmd : BuiltInStateModelDefinitions.values()) {
+ when(testCache.getStateModelDef(bsmd.name())).thenReturn(bsmd.getStateModelDefinition());
+ }
+
+ return testCache;
+ }
+
+ /**
+ * Generate the replica objects according to the provider information.
+ */
+ protected Set<AssignableReplica> generateReplicas(ResourceControllerDataProvider dataProvider) {
+ // Create assignable replica based on the current state.
+ Map<String, CurrentState> currentStatemap =
+ dataProvider.getCurrentState(_testInstanceId, "testSessionId");
+ Set<AssignableReplica> assignmentSet = new HashSet<>();
+ for (CurrentState cs : currentStatemap.values()) {
+ ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
+ // Construct one AssignableReplica for each partition in the current state.
+ cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet.add(
+ new AssignableReplica(resourceConfig, entry.getKey(), entry.getValue(),
+ entry.getValue().equals("MASTER") ? 1 : 2)));
+ }
+ return assignmentSet;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
new file mode 100644
index 0000000..d7fcce9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -0,0 +1,203 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.when;
+
+public class TestAssignableNode extends AbstractTestClusterModel {
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ }
+
+ @Test
+ public void testNormalUsage() throws IOException {
+ // Test 1 - initialize based on the data cache and check with the expected result
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+ Set<String> expectedAssignmentSet1 = new HashSet<>(_partitionNames.subList(0, 2));
+ Set<String> expectedAssignmentSet2 = new HashSet<>(_partitionNames.subList(2, 4));
+ Map<String, Set<String>> expectedAssignment = new HashMap<>();
+ expectedAssignment.put("Resource1", expectedAssignmentSet1);
+ expectedAssignment.put("Resource2", expectedAssignmentSet2);
+ Map<String, Integer> expectedCapacityMap = new HashMap<>();
+ expectedCapacityMap.put("item1", 4);
+ expectedCapacityMap.put("item2", 8);
+ expectedCapacityMap.put("item3", 30);
+
+ AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+ Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+ Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+ Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
+ Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+ Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+ Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+ Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+ Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+ Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+
+ // Test 2 - release assignment from the AssignableNode
+ AssignableReplica removingReplica =
+ new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+ _partitionNames.get(2), "MASTER", 1);
+ expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
+ expectedCapacityMap.put("item1", 9);
+ expectedCapacityMap.put("item2", 18);
+
+ assignableNode.release(removingReplica);
+
+ Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+ Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 3);
+ Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 11.0 / 20.0, 0.005);
+ Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+ Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+ Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+ Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+ Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+ Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+
+ // Test 3 - add assignment to the AssignableNode
+ AssignableReplica addingReplica =
+ new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+ _partitionNames.get(2), "SLAVE", 2);
+ expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
+ expectedCapacityMap.put("item1", 4);
+ expectedCapacityMap.put("item2", 8);
+
+ assignableNode.assign(addingReplica);
+
+ Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
+ Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+ Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
+ Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+ Assert.assertEquals(assignableNode.getMaxPartition(), 5);
+ Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
+ Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
+ Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
+ Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+ }
+
+ @Test
+ public void testReleaseNoPartition() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+ AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+ Collections.emptyList());
+ AssignableReplica removingReplica =
+ new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(1)),
+ _partitionNames.get(2) + "non-exist", "MASTER", 1);
+
+ // Release shall pass.
+ assignableNode.release(removingReplica);
+ }
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 on node testInstanceId")
+ public void testAssignDuplicateReplica() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+ AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+ AssignableReplica duplicateReplica =
+ new AssignableReplica(testCache.getResourceConfig(_resourceNames.get(0)),
+ _partitionNames.get(0), "SLAVE", 2);
+ assignableNode.assign(duplicateReplica);
+ }
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.")
+ public void testParseFaultZoneNotFound() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+ ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setFaultZoneType("DOES_NOT_EXIST");
+ testClusterConfig.setTopologyAwareEnabled(true);
+ testClusterConfig.setTopology("/DOES_NOT_EXIST/");
+ when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+ InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ testInstanceConfig.setDomain("zone=2, instance=testInstance");
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testParseFaultZone() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+ ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setFaultZoneType("zone");
+ testClusterConfig.setTopologyAwareEnabled(true);
+ testClusterConfig.setTopology("/zone/instance");
+ when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+ InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ testInstanceConfig.setDomain("zone=2, instance=testInstance");
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+ instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+ Collections.emptyList());
+
+ Assert.assertEquals(assignableNode.getFaultZone(), "2/");
+
+ testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setFaultZoneType("instance");
+ testClusterConfig.setTopologyAwareEnabled(true);
+ testClusterConfig.setTopology("/zone/instance");
+ when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+
+ testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ testInstanceConfig.setDomain("zone=2, instance=testInstance");
+ instanceConfigMap = new HashMap<>();
+ instanceConfigMap.put(_testInstanceId, testInstanceConfig);
+ when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ assignableNode = new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
+ Collections.emptyList());
+
+ Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
new file mode 100644
index 0000000..d069ced
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableReplica.java
@@ -0,0 +1,99 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestAssignableReplica {
+ String resourceName = "Resource";
+ String partitionNamePrefix = "partition";
+ String masterState = "Master";
+ int masterPriority = StateModelDefinition.TOP_STATE_PRIORITY;
+ String slaveState = "Slave";
+ int slavePriority = 2;
+
+ @Test
+ public void testConstructRepliaWithResourceConfig() throws IOException {
+ // Init assignable replica with a basic config object
+ Map<String, Integer> capacityDataMapResource1 = new HashMap<>();
+ capacityDataMapResource1.put("item1", 3);
+ capacityDataMapResource1.put("item2", 6);
+ ResourceConfig testResourceConfigResource = new ResourceConfig(resourceName);
+ testResourceConfigResource.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
+
+ String partitionName = partitionNamePrefix + 1;
+ AssignableReplica replica =
+ new AssignableReplica(testResourceConfigResource, partitionName, masterState,
+ masterPriority);
+ Assert.assertEquals(replica.getResourceName(), resourceName);
+ Assert.assertEquals(replica.getPartitionName(), partitionName);
+ Assert.assertEquals(replica.getReplicaState(), masterState);
+ Assert.assertEquals(replica.getStatePriority(), masterPriority);
+ Assert.assertTrue(replica.isReplicaTopState());
+ Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
+ Assert.assertEquals(replica.getResourceInstanceGroupTag(), null);
+ Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), Integer.MAX_VALUE);
+
+ // Modify the config and initialize more replicas.
+ // 1. update capacity
+ Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
+ capacityDataMapResource2.put("item1", 5);
+ capacityDataMapResource2.put("item2", 10);
+ Map<String, Map<String, Integer>> capacityMap =
+ testResourceConfigResource.getPartitionCapacityMap();
+ String partitionName2 = partitionNamePrefix + 2;
+ capacityMap.put(partitionName2, capacityDataMapResource2);
+ testResourceConfigResource.setPartitionCapacityMap(capacityMap);
+ // 2. update instance group tag and max partitions per instance
+ String group = "DEFAULT";
+ int maxPartition = 10;
+ testResourceConfigResource.getRecord()
+ .setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.toString(), group);
+ testResourceConfigResource.getRecord()
+ .setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
+ maxPartition);
+
+ replica = new AssignableReplica(testResourceConfigResource, partitionName, masterState,
+ masterPriority);
+ Assert.assertEquals(replica.getCapacity(), capacityDataMapResource1);
+ Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
+ Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
+
+ replica = new AssignableReplica(testResourceConfigResource, partitionName2, slaveState,
+ slavePriority);
+ Assert.assertEquals(replica.getResourceName(), resourceName);
+ Assert.assertEquals(replica.getPartitionName(), partitionName2);
+ Assert.assertEquals(replica.getReplicaState(), slaveState);
+ Assert.assertEquals(replica.getStatePriority(), slavePriority);
+ Assert.assertFalse(replica.isReplicaTopState());
+ Assert.assertEquals(replica.getCapacity(), capacityDataMapResource2);
+ Assert.assertEquals(replica.getResourceInstanceGroupTag(), group);
+ Assert.assertEquals(replica.getResourceMaxPartitionsPerInstance(), maxPartition);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
new file mode 100644
index 0000000..8206f29
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterContext.java
@@ -0,0 +1,90 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TestClusterContext extends AbstractTestClusterModel {
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ }
+
+ @Test
+ public void testNormalUsage() throws IOException {
+ // Test 1 - initialize the cluster context based on the data cache.
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+
+ ClusterContext context = new ClusterContext(assignmentSet, 2);
+
+ // Note that we left some margin for the max estimation.
+ Assert.assertEquals(context.getEstimatedMaxPartitionCount(), 3);
+ Assert.assertEquals(context.getEstimatedMaxTopStateCount(), 2);
+ Assert.assertEquals(context.getAssignmentForFaultZoneMap(), Collections.emptyMap());
+ for (String resourceName : _resourceNames) {
+ Assert.assertEquals(context.getEstimatedMaxPartitionByResource(resourceName), 2);
+ Assert.assertEquals(
+ context.getPartitionsForResourceAndFaultZone(_testFaultZoneId, resourceName),
+ Collections.emptySet());
+ }
+
+ // Assign
+ Map<String, Map<String, Set<String>>> expectedFaultZoneMap = Collections
+ .singletonMap(_testFaultZoneId, assignmentSet.stream().collect(Collectors
+ .groupingBy(AssignableReplica::getResourceName,
+ Collectors.mapping(AssignableReplica::getPartitionName, Collectors.toSet()))));
+
+ assignmentSet.stream().forEach(replica -> context
+ .addPartitionToFaultZone(_testFaultZoneId, replica.getResourceName(),
+ replica.getPartitionName()));
+ Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+
+ // release
+ expectedFaultZoneMap.get(_testFaultZoneId).get(_resourceNames.get(0))
+ .remove(_partitionNames.get(0));
+ Assert.assertTrue(context.removePartitionFromFaultZone(_testFaultZoneId, _resourceNames.get(0),
+ _partitionNames.get(0)));
+
+ Assert.assertEquals(context.getAssignmentForFaultZoneMap(), expectedFaultZoneMap);
+ }
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica from partition Partition1 in fault zone testZone")
+ public void testDuplicateAssign() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
+ ClusterContext context = new ClusterContext(assignmentSet, 2);
+ context
+ .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
+ // Insert again and trigger the error.
+ context
+ .addPartitionToFaultZone(_testFaultZoneId, _resourceNames.get(0), _partitionNames.get(0));
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
new file mode 100644
index 0000000..c07bd98
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -0,0 +1,114 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestClusterModel extends AbstractTestClusterModel {
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ }
+
+ /**
+ * Generate AssignableNodes according to the instances included in the cluster data cache.
+ */
+ Set<AssignableNode> generateNodes(ResourceControllerDataProvider testCache) {
+ Set<AssignableNode> nodeSet = new HashSet<>();
+ testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add(
+ new AssignableNode(testCache.getClusterConfig(),
+ testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
+ Collections.emptyList())));
+ return nodeSet;
+ }
+
+ @Test
+ public void testNormalUsage() throws IOException {
+ // Test 1 - initialize the cluster model based on the data cache.
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ Set<AssignableReplica> assignableReplicas = generateReplicas(testCache);
+ Set<AssignableNode> assignableNodes = generateNodes(testCache);
+
+ ClusterContext context = new ClusterContext(assignableReplicas, 2);
+ ClusterModel clusterModel =
+ new ClusterModel(context, assignableReplicas, assignableNodes, Collections.emptyMap(),
+ Collections.emptyMap());
+
+ Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+ .allMatch(resourceMap -> resourceMap.values().isEmpty()));
+ Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+ .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+
+ // The initialization of the context, node and replication has been tested separately. So for
+ // cluster model, focus on testing the assignment and release.
+
+ // Assign
+ AssignableReplica replica = assignableReplicas.iterator().next();
+ AssignableNode assignableNode = assignableNodes.iterator().next();
+ clusterModel
+ .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+ assignableNode.getInstanceName());
+
+ Assert.assertTrue(
+ clusterModel.getContext().getAssignmentForFaultZoneMap().get(assignableNode.getFaultZone())
+ .get(replica.getResourceName()).contains(replica.getPartitionName()));
+ Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().get(replica.getResourceName())
+ .contains(replica.getPartitionName()));
+
+ // Assign a nonexist replication
+ try {
+ clusterModel.assign("NOT-EXIST", replica.getPartitionName(), replica.getReplicaState(),
+ assignableNode.getInstanceName());
+ Assert.fail("Assigning a non existing resource partition shall fail.");
+ } catch (HelixException ex) {
+ // expected
+ }
+
+ // Assign a non-exist replication
+ try {
+ clusterModel
+ .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+ "NON-EXIST");
+ Assert.fail("Assigning a resource partition to a non existing instance shall fail.");
+ } catch (HelixException ex) {
+ // expected
+ }
+
+ // Release
+ clusterModel
+ .release(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
+ assignableNode.getInstanceName());
+
+ Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
+ .allMatch(resourceMap -> resourceMap.values().stream()
+ .allMatch(partitions -> partitions.isEmpty())));
+ Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
+ .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+ }
+}