You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:41 UTC
[2/5] helix git commit: [HELIX-568] Add new topology aware
(rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is
available at:
https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
deleted file mode 100644
index 959609f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ /dev/null
@@ -1,753 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-
-public class AutoRebalanceStrategy implements RebalanceStrategy {
- private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
- private final ReplicaPlacementScheme _placementScheme;
-
- private String _resourceName;
- private List<String> _partitions;
- private LinkedHashMap<String, Integer> _states;
- private int _maximumPerNode;
-
- private Map<String, Node> _nodeMap;
- private List<Node> _liveNodesList;
- private Map<Integer, String> _stateMap;
-
- private Map<Replica, Node> _preferredAssignment;
- private Map<Replica, Node> _existingPreferredAssignment;
- private Map<Replica, Node> _existingNonPreferredAssignment;
- private Set<Replica> _orphaned;
-
- public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode) {
- init(resourceName, partitions, states, maximumPerNode);
- _placementScheme = new DefaultPlacementScheme();
- }
-
- public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states) {
- this(resourceName, partitions, states, Integer.MAX_VALUE);
- }
-
- @Override
- public void init(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
- _maximumPerNode = maximumPerNode;
- }
-
- @Override
- public ZNRecord computePartitionAssignment(final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
- int numReplicas = countStateReplicas();
- ZNRecord znRecord = new ZNRecord(_resourceName);
- if (liveNodes.size() == 0) {
- return znRecord;
- }
- int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
- int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
- _nodeMap = new HashMap<String, Node>();
- _liveNodesList = new ArrayList<Node>();
-
- for (String id : allNodes) {
- Node node = new Node(id);
- node.capacity = 0;
- node.hasCeilingCapacity = false;
- _nodeMap.put(id, node);
- }
- for (int i = 0; i < liveNodes.size(); i++) {
- boolean usingCeiling = false;
- int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
- if (distRemainder > 0 && targetSize < _maximumPerNode) {
- targetSize += 1;
- distRemainder = distRemainder - 1;
- usingCeiling = true;
- }
- Node node = _nodeMap.get(liveNodes.get(i));
- node.isAlive = true;
- node.capacity = targetSize;
- node.hasCeilingCapacity = usingCeiling;
- _liveNodesList.add(node);
- }
-
- // compute states for all replica ids
- _stateMap = generateStateMap();
-
- // compute the preferred mapping if all nodes were up
- _preferredAssignment = computePreferredPlacement(allNodes);
-
- // logger.info("preferred mapping:"+ preferredAssignment);
- // from current mapping derive the ones in preferred location
- // this will update the nodes with their current fill status
- _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
-
- // from current mapping derive the ones not in preferred location
- _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
-
- // compute orphaned replicas that are not assigned to any node
- _orphaned = computeOrphaned();
- if (logger.isInfoEnabled()) {
- logger.info("orphan = " + _orphaned);
- }
-
- moveNonPreferredReplicasToPreferred();
-
- assignOrphans();
-
- moveExcessReplicas();
-
- prepareResult(znRecord);
- return znRecord;
- }
-
- /**
- * Move replicas assigned to non-preferred nodes if their current node is at capacity
- * and its preferred node is under capacity.
- */
- private void moveNonPreferredReplicasToPreferred() {
- // iterate through non preferred and see if we can move them to the
- // preferred location if the donor has more than it should and stealer has
- // enough capacity
- Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<Replica, Node> entry = iterator.next();
- Replica replica = entry.getKey();
- Node donor = entry.getValue();
- Node receiver = _preferredAssignment.get(replica);
- if (donor.capacity < donor.currentlyAssigned
- && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
- donor.currentlyAssigned = donor.currentlyAssigned - 1;
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- donor.nonPreferred.remove(replica);
- receiver.preferred.add(replica);
- donor.newReplicas.remove(replica);
- receiver.newReplicas.add(replica);
- iterator.remove();
- }
- }
- }
-
- /**
- * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
- */
- private void assignOrphans() {
- // now iterate over nodes and remaining orphaned partitions and assign
- // partitions randomly
- // Better to iterate over orphaned partitions first
- Iterator<Replica> it = _orphaned.iterator();
- while (it.hasNext()) {
- Replica replica = it.next();
- boolean added = false;
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node receiver = _liveNodesList.get(index % _liveNodesList.size());
- if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- receiver.nonPreferred.add(replica);
- receiver.newReplicas.add(replica);
- added = true;
- break;
- }
- }
- if (!added) {
- // try adding the replica by making room for it
- added = assignOrphanByMakingRoom(replica);
- }
- if (added) {
- it.remove();
- }
- }
- if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
- logger.info("could not assign nodes to partitions: " + _orphaned);
- }
- }
-
- /**
- * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
- * @param replica The replica to assign
- * @return true if the assignment succeeded, false otherwise
- */
- private boolean assignOrphanByMakingRoom(Replica replica) {
- Node capacityDonor = null;
- Node capacityAcceptor = null;
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node current = _liveNodesList.get(index % _liveNodesList.size());
- if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
- && !current.canAddIfCapacity(replica) && capacityDonor == null) {
- // this node has space but cannot accept the node
- capacityDonor = current;
- } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
- && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
- // this node would be able to accept the replica if it has ceiling capacity
- capacityAcceptor = current;
- }
- if (capacityDonor != null && capacityAcceptor != null) {
- break;
- }
- }
- if (capacityDonor != null && capacityAcceptor != null) {
- // transfer ceiling capacity and add the node
- capacityAcceptor.steal(capacityDonor, replica);
- return true;
- }
- return false;
- }
-
- /**
- * Move replicas from too-full nodes to nodes that can accept the replicas
- */
- private void moveExcessReplicas() {
- // iterate over nodes and move extra load
- Iterator<Replica> it;
- for (Node donor : _liveNodesList) {
- if (donor.capacity < donor.currentlyAssigned) {
- Collections.sort(donor.nonPreferred);
- it = donor.nonPreferred.iterator();
- while (it.hasNext()) {
- Replica replica = it.next();
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node receiver = _liveNodesList.get(index % _liveNodesList.size());
- if (receiver.canAdd(replica)) {
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- receiver.nonPreferred.add(replica);
- donor.currentlyAssigned = donor.currentlyAssigned - 1;
- it.remove();
- break;
- }
- }
- if (donor.capacity >= donor.currentlyAssigned) {
- break;
- }
- }
- if (donor.capacity < donor.currentlyAssigned) {
- logger.warn("Could not take partitions out of node:" + donor.id);
- }
- }
- }
- }
-
- /**
- * Update a ZNRecord with the results of the rebalancing.
- * @param znRecord
- */
- private void prepareResult(ZNRecord znRecord) {
- // The map fields are keyed on partition name to a pair of node and state, i.e. it
- // indicates that the partition with given state is served by that node
- //
- // The list fields are also keyed on partition and list all the nodes serving that partition.
- // This is useful to verify that there is no node serving multiple replicas of the same
- // partition.
- Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
- for (String partition : _partitions) {
- znRecord.setMapField(partition, new TreeMap<String, String>());
- znRecord.setListField(partition, new ArrayList<String>());
- newPreferences.put(partition, new ArrayList<String>());
- }
-
- // for preference lists, the rough priority that we want is:
- // [existing preferred, existing non-preferred, non-existing preferred, non-existing
- // non-preferred]
- for (Node node : _liveNodesList) {
- for (Replica replica : node.preferred) {
- if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
- } else {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- }
- for (Node node : _liveNodesList) {
- for (Replica replica : node.nonPreferred) {
- if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
- } else {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- }
- normalizePreferenceLists(znRecord.getListFields(), newPreferences);
-
- // generate preference maps based on the preference lists
- for (String partition : _partitions) {
- List<String> preferenceList = znRecord.getListField(partition);
- int i = 0;
- for (String participant : preferenceList) {
- znRecord.getMapField(partition).put(participant, _stateMap.get(i));
- i++;
- }
- }
- }
-
- /**
- * Adjust preference lists to reduce the number of same replicas on an instance. This will
- * separately normalize two sets of preference lists, and then append the results of the second
- * set to those of the first. This basically ensures that existing replicas are automatically
- * preferred.
- * @param preferenceLists map of (partition --> list of nodes)
- * @param newPreferences map containing node preferences not consistent with the current
- * assignment
- */
- private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
- Map<String, List<String>> newPreferences) {
-
- Map<String, Map<String, Integer>> nodeReplicaCounts =
- new HashMap<String, Map<String, Integer>>();
- for (String partition : preferenceLists.keySet()) {
- normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
- }
- for (String partition : newPreferences.keySet()) {
- normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
- preferenceLists.get(partition).addAll(newPreferences.get(partition));
- }
- }
-
- /**
- * Adjust a single preference list for replica assignment imbalance
- * @param preferenceList list of node names
- * @param nodeReplicaCounts map of (node --> state --> count)
- */
- private void normalizePreferenceList(List<String> preferenceList,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- List<String> newPreferenceList = new ArrayList<String>();
- int replicas = Math.min(countStateReplicas(), preferenceList.size());
-
- // make this a LinkedHashSet to preserve iteration order
- Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
- for (int i = 0; i < replicas; i++) {
- String state = _stateMap.get(i);
- String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
- newPreferenceList.add(node);
- notAssigned.remove(node);
- Map<String, Integer> counts = nodeReplicaCounts.get(node);
- counts.put(state, counts.get(state) + 1);
- }
- preferenceList.clear();
- preferenceList.addAll(newPreferenceList);
- }
-
- /**
- * Get the node which hosts the fewest of a given replica
- * @param state the state
- * @param nodes nodes to check
- * @param nodeReplicaCounts current assignment of replicas
- * @return the node most willing to accept the replica
- */
- private String getMinimumNodeForReplica(String state, Set<String> nodes,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- String minimalNode = null;
- int minimalCount = Integer.MAX_VALUE;
- for (String node : nodes) {
- int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
- if (count < minimalCount) {
- minimalCount = count;
- minimalNode = node;
- }
- }
- return minimalNode;
- }
-
- /**
- * Safe check for the number of replicas of a given id assiged to a node
- * @param state the state to assign
- * @param node the node to check
- * @param nodeReplicaCounts a map of node to replica id and counts
- * @return the number of currently assigned replicas of the given id
- */
- private int getReplicaCountForNode(String state, String node,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- if (!nodeReplicaCounts.containsKey(node)) {
- Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
- replicaCounts.put(state, 0);
- nodeReplicaCounts.put(node, replicaCounts);
- return 0;
- }
- Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
- if (!replicaCounts.containsKey(state)) {
- replicaCounts.put(state, 0);
- return 0;
- }
- return replicaCounts.get(state);
- }
-
- /**
- * Compute the subset of the current mapping where replicas are not mapped according to their
- * preferred assignment.
- * @param currentMapping Current mapping of replicas to nodes
- * @return The current assignments that do not conform to the preferred assignment
- */
- private Map<Replica, Node> computeExistingNonPreferredPlacement(
- Map<String, Map<String, String>> currentMapping) {
- Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
- int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
- for (String nodeId : nodeStateMap.keySet()) {
- Node node = _nodeMap.get(nodeId);
- boolean skip = false;
- for (Replica replica : node.preferred) {
- if (replica.partition.equals(partition)) {
- skip = true;
- break;
- }
- }
- if (skip) {
- continue;
- }
- // check if its in one of the preferred position
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- if (!_preferredAssignment.containsKey(replica)) {
-
- logger.info("partitions: " + _partitions);
- logger.info("currentMapping.keySet: " + currentMapping.keySet());
- throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
- }
-
- if (_preferredAssignment.get(replica).id != node.id
- && !_existingPreferredAssignment.containsKey(replica)
- && !existingNonPreferredAssignment.containsKey(replica)) {
- existingNonPreferredAssignment.put(replica, node);
- node.nonPreferred.add(replica);
-
- break;
- }
- }
- }
- }
- return existingNonPreferredAssignment;
- }
-
- /**
- * Get a live node index to try first for a replica so that each possible start index is
- * roughly uniformly assigned.
- * @param replica The replica to assign
- * @return The starting node index to try
- */
- private int computeRandomStartIndex(final Replica replica) {
- return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
- }
-
- /**
- * Get a set of replicas not currently assigned to any node
- * @return Unassigned replicas
- */
- private Set<Replica> computeOrphaned() {
- Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
- for (Replica r : _existingPreferredAssignment.keySet()) {
- if (orphanedPartitions.contains(r)) {
- orphanedPartitions.remove(r);
- }
- }
- for (Replica r : _existingNonPreferredAssignment.keySet()) {
- if (orphanedPartitions.contains(r)) {
- orphanedPartitions.remove(r);
- }
- }
-
- return orphanedPartitions;
- }
-
- /**
- * Determine the replicas already assigned to their preferred nodes
- * @param currentMapping Current assignment of replicas to nodes
- * @return Assignments that conform to the preferred placement
- */
- private Map<Replica, Node> computeExistingPreferredPlacement(
- final Map<String, Map<String, String>> currentMapping) {
- Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
- int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
- for (String nodeId : nodeStateMap.keySet()) {
- Node node = _nodeMap.get(nodeId);
- node.currentlyAssigned = node.currentlyAssigned + 1;
- // check if its in one of the preferred position
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- if (_preferredAssignment.containsKey(replica)
- && !existingPreferredAssignment.containsKey(replica)
- && _preferredAssignment.get(replica).id == node.id) {
- existingPreferredAssignment.put(replica, node);
- node.preferred.add(replica);
- break;
- }
- }
- }
- }
-
- return existingPreferredAssignment;
- }
-
- /**
- * Given a predefined set of all possible nodes, compute an assignment of replicas to
- * nodes that evenly assigns all replicas to nodes.
- * @param allNodes Identifiers to all nodes, live and non-live
- * @return Preferred assignment of replicas
- */
- private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
- Map<Replica, Node> preferredMapping;
- preferredMapping = new HashMap<Replica, Node>();
- int partitionId = 0;
- int numReplicas = countStateReplicas();
- int count = countStateReplicas();
- for (String partition : _partitions) {
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- String nodeName =
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
- allNodes);
- preferredMapping.put(replica, _nodeMap.get(nodeName));
- }
- partitionId = partitionId + 1;
- }
- return preferredMapping;
- }
-
- /**
- * Counts the total number of replicas given a state-count mapping
- * @return
- */
- private int countStateReplicas() {
- int total = 0;
- for (Integer count : _states.values()) {
- total += count;
- }
- return total;
- }
-
- /**
- * Compute a map of replica ids to state names
- * @return Map: replica id -> state name
- */
- private Map<Integer, String> generateStateMap() {
- int replicaId = 0;
- Map<Integer, String> stateMap = new HashMap<Integer, String>();
- for (String state : _states.keySet()) {
- Integer count = _states.get(state);
- for (int i = 0; i < count; i++) {
- stateMap.put(replicaId, state);
- replicaId++;
- }
- }
- return stateMap;
- }
-
- /**
- * A Node is an entity that can serve replicas. It has a capacity and knowledge
- * of replicas assigned to it, so it can decide if it can receive additional replicas.
- */
- class Node {
- public int currentlyAssigned;
- public int capacity;
- public boolean hasCeilingCapacity;
- private final String id;
- boolean isAlive;
- private final List<Replica> preferred;
- private final List<Replica> nonPreferred;
- private final Set<Replica> newReplicas;
-
- public Node(String id) {
- preferred = new ArrayList<Replica>();
- nonPreferred = new ArrayList<Replica>();
- newReplicas = new TreeSet<Replica>();
- currentlyAssigned = 0;
- isAlive = false;
- this.id = id;
- }
-
- /**
- * Check if this replica can be legally added to this node
- * @param replica The replica to test
- * @return true if the assignment can be made, false otherwise
- */
- public boolean canAdd(Replica replica) {
- if (currentlyAssigned >= capacity) {
- return false;
- }
- return canAddIfCapacity(replica);
- }
-
- /**
- * Check if this replica can be legally added to this node, provided that it has enough
- * capacity.
- * @param replica The replica to test
- * @return true if the assignment can be made, false otherwise
- */
- public boolean canAddIfCapacity(Replica replica) {
- if (!isAlive) {
- return false;
- }
- for (Replica r : preferred) {
- if (r.partition.equals(replica.partition)) {
- return false;
- }
- }
- for (Replica r : nonPreferred) {
- if (r.partition.equals(replica.partition)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Receive a replica by stealing capacity from another Node
- * @param donor The node that has excess capacity
- * @param replica The replica to receive
- */
- public void steal(Node donor, Replica replica) {
- donor.hasCeilingCapacity = false;
- donor.capacity--;
- hasCeilingCapacity = true;
- capacity++;
- currentlyAssigned++;
- nonPreferred.add(replica);
- newReplicas.add(replica);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
- .append("\nnonpreferred:").append(nonPreferred.size());
- return sb.toString();
- }
- }
-
- /**
- * A Replica is a combination of a partition of the resource, the state the replica is in
- * and an identifier signifying a specific replica of a given partition and state.
- */
- class Replica implements Comparable<Replica> {
- private String partition;
- private int replicaId; // this is a partition-relative id
- private String format;
-
- public Replica(String partition, int replicaId) {
- this.partition = partition;
- this.replicaId = replicaId;
- this.format = this.partition + "|" + this.replicaId;
- }
-
- @Override
- public String toString() {
- return format;
- }
-
- @Override
- public boolean equals(Object that) {
- if (that instanceof Replica) {
- return this.format.equals(((Replica) that).format);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return this.format.hashCode();
- }
-
- @Override
- public int compareTo(Replica that) {
- if (that instanceof Replica) {
- return this.format.compareTo(that.format);
- }
- return -1;
- }
- }
-
- /**
- * Interface for providing a custom approach to computing a replica's affinity to a node.
- */
- public interface ReplicaPlacementScheme {
- /**
- * Initialize global state
- * @param manager The instance to which this placement is associated
- */
- public void init(final HelixManager manager);
-
- /**
- * Given properties of this replica, determine the node it would prefer to be served by
- * @param partitionId The current partition
- * @param replicaId The current replica with respect to the current partition
- * @param numPartitions The total number of partitions
- * @param numReplicas The total number of replicas per partition
- * @param nodeNames A list of identifiers of all nodes, live and non-live
- * @return The name of the node that would prefer to serve this replica
- */
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
- final List<String> nodeNames);
- }
-
- /**
- * Compute preferred placements based on a default strategy that assigns replicas to nodes as
- * evenly as possible while avoiding placing two replicas of the same partition on any node.
- */
- public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
- @Override
- public void init(final HelixManager manager) {
- // do nothing since this is independent of the manager
- }
-
- @Override
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
- final List<String> nodeNames) {
- int index;
- if (nodeNames.size() > numPartitions) {
- // assign replicas in partition order in case there are more nodes than partitions
- index = (partitionId + replicaId * numPartitions) % nodeNames.size();
- } else if (nodeNames.size() == numPartitions) {
- // need a replica offset in case the sizes of these sets are the same
- index =
- ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
- % nodeNames.size();
- } else {
- // in all other cases, assigning a replica at a time for each partition is reasonable
- index = (partitionId + replicaId) % nodeNames.size();
- }
- return nodeNames.get(index);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
deleted file mode 100644
index 4daae82..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ZNRecord;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Assignment strategy interface that computes the assignment of partition->instance.
- */
-public interface RebalanceStrategy {
- /**
- * Perform the necessary initialization for the rebalance strategy object.
- * @param resourceName
- * @param partitions
- * @param states
- * @param maximumPerNode
- */
- void init(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode);
-
- /**
- * Compute the preference lists and (optional partition-state mapping) for the given resource.
- *
- * @param liveNodes
- * @param currentMapping
- * @param allNodes
- * @return
- */
- ZNRecord computePartitionAssignment(final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index e97ac9b..73f2cbb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
@@ -607,6 +608,13 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void addResource(String clusterName, String resourceName, int partitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+ rebalanceStrategy, 0, -1);
+ }
+
+ @Override
public void addResource(String clusterName, String resourceName, IdealState idealstate) {
String stateModelRef = idealstate.getStateModelDefRef();
String stateModelDefPath =
@@ -629,14 +637,21 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void addResource(String clusterName, String resourceName, int partitions,
String stateModelRef, String rebalancerMode, int bucketSize) {
- addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize,
- -1);
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1);
}
@Override
public void addResource(String clusterName, String resourceName, int partitions,
String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) {
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+ RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance);
+ }
+
+ @Override
+ public void addResource(String clusterName, String resourceName, int partitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+ int maxPartitionsPerInstance) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster " + clusterName + " is not setup yet");
}
@@ -647,6 +662,7 @@ public class ZKHelixAdmin implements HelixAdmin {
RebalanceMode mode =
idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO);
idealState.setRebalanceMode(mode);
+ idealState.setRebalanceStrategy(rebalanceStrategy);
idealState.setReplicas("" + 0);
idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
@@ -1014,8 +1030,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ZNRecord update(ZNRecord currentData) {
ClusterConstraints constraints =
- currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
- currentData);
+ currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
@@ -1153,6 +1168,26 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) {
+ if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
+ throw new HelixException("cluster " + clusterName + " is not setup yet");
+ }
+
+ if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+ throw new HelixException("cluster " + clusterName + " instance " + instanceName
+ + " is not setup yet");
+ }
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ PropertyKey configKey = keyBuilder.instanceConfig(instanceName);
+ InstanceConfig config = accessor.getProperty(configKey);
+ config.setZoneId(zoneId);
+ accessor.setProperty(configKey, config);
+ }
+
+ @Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
new file mode 100644
index 0000000..25a16d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -0,0 +1,92 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+/**
+ * Cluster configurations
+ */
+public class ClusterConfig extends HelixProperty {
+ /**
+ * Configurable characteristics of a cluster
+ */
+ public enum ClusterConfigProperty {
+ HELIX_DISABLE_PIPELINE_TRIGGERS,
+ TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
+ FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+ }
+
+ /**
+ * Instantiate for a specific cluster
+ *
+ * @param cluster the cluster identifier
+ */
+ public ClusterConfig(String cluster) {
+ super(cluster);
+ }
+
+ /**
+ * Instantiate with a pre-populated record
+ *
+ * @param record a ZNRecord corresponding to a cluster configuration
+ */
+ public ClusterConfig(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Whether to persist best possible assignment in a resource's idealstate.
+ *
+ * @return
+ */
+ public Boolean isPipelineTriggersDisabled() {
+ return _record
+ .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ClusterConfig) {
+ ClusterConfig that = (ClusterConfig) obj;
+
+ if (this.getId().equals(that.getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ /**
+ * Get the name of this resource
+ *
+ * @return the instance name
+ */
+ public String getClusterName() {
+ return _record.getId();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7c4cf54..55d4734 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -167,7 +167,7 @@ public class IdealState extends HelixProperty {
/**
* Specify the strategy for Helix to use to compute the partition-instance assignment,
- * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+ * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy}
*
* @param rebalanceStrategy
* @return
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index eb1c652..ecf2900 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty {
public enum InstanceConfigProperty {
HELIX_HOST,
HELIX_PORT,
+ HELIX_ZONE_ID,
HELIX_ENABLED,
HELIX_DISABLED_PARTITION,
- TAG_LIST
+ TAG_LIST,
+ INSTANCE_WEIGHT,
+ DOMAIN
}
+ public static final int WEIGHT_NOT_SET = -1;
private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
@@ -94,6 +98,50 @@ public class InstanceConfig extends HelixProperty {
_record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
}
+ public String getZoneId() {
+ return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name());
+ }
+
+ public void setZoneId(String zoneId) {
+ _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId);
+ }
+
+ /**
+ * Domain represents a hierarchy identifier for an instance.
+ * @return
+ */
+ public String getDomain() {
+ return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name());
+ }
+
+ /**
+ * Domain represents a hierarchy identifier for an instance.
+ * Example: "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
+ * @return
+ */
+ public void setDomain(String domain) {
+ _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain);
+ }
+
+ public int getWeight() {
+ String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name());
+ if (w != null) {
+ try {
+ int weight = Integer.valueOf(w);
+ return weight;
+ } catch (NumberFormatException e) {
+ }
+ }
+ return WEIGHT_NOT_SET;
+ }
+
+ public void setWeight(int weight) {
+ if (weight <= 0) {
+ throw new IllegalArgumentException("Instance weight can not be equal or less than 0!");
+ }
+ _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight));
+ }
+
/**
* Get arbitrary tags associated with the instance
* @return a list of tags
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 623357f..ac96768 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -32,8 +32,8 @@ import java.util.TreeSet;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
@@ -126,7 +126,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
List<String> allNodes =
Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
Collections.sort(allNodes);
- ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+ ZNRecord record =
+ strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
Map<String, List<String>> preferenceLists = record.getListFields();
// Convert to an assignment keyed on participant
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 9d411bb..08ccbdc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -362,6 +362,12 @@ public class ClusterSetup {
}
public void addResourceToCluster(String clusterName, String resourceName, int numResources,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+ _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
+ rebalanceStrategy);
+ }
+
+ public void addResourceToCluster(String clusterName, String resourceName, int numResources,
String stateModelRef, String rebalancerMode, int bucketSize) {
_admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
bucketSize);
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
new file mode 100644
index 0000000..a4e38a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
@@ -0,0 +1,766 @@
+package org.apache.helix.controller.Strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestAutoRebalanceStrategy {
+ private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
+
+ /**
+ * Sanity test for a basic Master-Slave model
+ */
+ @Test
+ public void simpleMasterSlaveTest() {
+ final int NUM_ITERATIONS = 10;
+ final int NUM_PARTITIONS = 10;
+ final int NUM_LIVE_NODES = 12;
+ final int NUM_TOTAL_NODES = 20;
+ final int MAX_PER_NODE = 5;
+
+ final String[] STATE_NAMES = {
+ "MASTER", "SLAVE"
+ };
+ final int[] STATE_COUNTS = {
+ 1, 2
+ };
+
+ runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
+ MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
+ }
+
+ /**
+ * Run a test for an arbitrary state model.
+ * @param name Name of the test state model
+ * @param numIterations Number of rebalance tasks to run
+ * @param numPartitions Number of partitions for the resource
+ * @param numLiveNodes Number of live nodes in the cluster
+ * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
+ * numLiveNodes
+ * @param maxPerNode Maximum number of replicas a node can serve
+ * @param stateNames States ordered by preference
+ * @param stateCounts Number of replicas that should be in each state
+ */
+ private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
+ int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
+ List<String> partitions = new ArrayList<String>();
+ for (int i = 0; i < numPartitions; i++) {
+ partitions.add("p_" + i);
+ }
+
+ List<String> liveNodes = new ArrayList<String>();
+ List<String> allNodes = new ArrayList<String>();
+ for (int i = 0; i < numTotalNodes; i++) {
+ allNodes.add("n_" + i);
+ if (i < numLiveNodes) {
+ liveNodes.add("n_" + i);
+ }
+ }
+
+ Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
+
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+ for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
+ states.put(stateNames[i], stateCounts[i]);
+ }
+
+ StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
+
+ new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
+ stateModelDef).runRepeatedly(numIterations);
+ }
+
+ /**
+ * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
+ * into account when computing mappings, so this is acceptable.
+ * @param modelName name to give the model
+ * @param initialState initial state for all nodes
+ * @param states ordered map of state to count
+ * @return incomplete StateModelDefinition for rebalancing
+ */
+ private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
+ LinkedHashMap<String, Integer> states) {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
+ builder.initialState(initialState);
+ int i = states.size();
+ for (String state : states.keySet()) {
+ builder.addState(state, i);
+ builder.upperBound(state, states.get(state));
+ i--;
+ }
+ return builder.build();
+ }
+
+ class AutoRebalanceTester {
+ private static final double P_KILL = 0.45;
+ private static final double P_ADD = 0.1;
+ private static final double P_RESURRECT = 0.45;
+ private static final String RESOURCE_NAME = "resource";
+
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private List<String> _liveNodes;
+ private Set<String> _liveSet;
+ private Set<String> _removedSet;
+ private Set<String> _nonLiveSet;
+ private Map<String, Map<String, String>> _currentMapping;
+ private List<String> _allNodes;
+ private int _maxPerNode;
+ private StateModelDefinition _stateModelDef;
+ private Random _random;
+
+ public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
+ List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
+ List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
+ _partitions = partitions;
+ _states = states;
+ _liveNodes = liveNodes;
+ _liveSet = new TreeSet<String>();
+ for (String node : _liveNodes) {
+ _liveSet.add(node);
+ }
+ _removedSet = new TreeSet<String>();
+ _nonLiveSet = new TreeSet<String>();
+ _currentMapping = currentMapping;
+ _allNodes = allNodes;
+ for (String node : allNodes) {
+ if (!_liveSet.contains(node)) {
+ _nonLiveSet.add(node);
+ }
+ }
+ _maxPerNode = maxPerNode;
+ _stateModelDef = stateModelDef;
+ _random = new Random();
+ }
+
+ /**
+ * Repeatedly randomly select a task to run and report the result
+ * @param numIterations
+ * Number of random tasks to run in sequence
+ */
+ public void runRepeatedly(int numIterations) {
+ logger.info("~~~~ Initial State ~~~~~");
+ RebalanceStrategy strategy =
+ new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
+ ZNRecord initialResult =
+ strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ _currentMapping = getMapping(initialResult.getListFields());
+ logger.info(_currentMapping);
+ getRunResult(_currentMapping, initialResult.getListFields());
+ for (int i = 0; i < numIterations; i++) {
+ logger.info("~~~~ Iteration " + i + " ~~~~~");
+ ZNRecord znRecord = runOnceRandomly();
+ if (znRecord != null) {
+ final Map<String, List<String>> listResult = znRecord.getListFields();
+ final Map<String, Map<String, String>> mapResult = getMapping(listResult);
+ logger.info(mapResult);
+ logger.info(listResult);
+ getRunResult(mapResult, listResult);
+ _currentMapping = mapResult;
+ }
+ }
+ }
+
+ private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
+ final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
+ ClusterDataCache cache = new ClusterDataCache();
+ MockAccessor accessor = new MockAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ for (String node : _liveNodes) {
+ LiveInstance liveInstance = new LiveInstance(node);
+ liveInstance.setSessionId("testSession");
+ accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
+ }
+ cache.refresh(accessor);
+ for (String partition : _partitions) {
+ List<String> preferenceList = listResult.get(partition);
+ Map<String, String> currentStateMap = _currentMapping.get(partition);
+ Set<String> disabled = Collections.emptySet();
+ Map<String, String> assignment =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
+ preferenceList, currentStateMap, disabled, true);
+ mapResult.put(partition, assignment);
+ }
+ return mapResult;
+ }
+
+ /**
+ * Output various statistics and correctness check results
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ * @param listFields
+ * The map-list assignment generated by the rebalancer
+ */
+ public void getRunResult(final Map<String, Map<String, String>> mapFields,
+ final Map<String, List<String>> listFields) {
+ logger.info("***** Statistics *****");
+ dumpStatistics(mapFields);
+ verifyCorrectness(mapFields, listFields);
+ }
+
+ /**
+ * Output statistics about the assignment
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ */
+ public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
+ Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+ int nodeCount = _liveNodes.size();
+ logger.info("Total number of nodes: " + nodeCount);
+ logger.info("Nodes: " + _liveNodes);
+ int sumPartitions = getSum(partitionsPerNode.values());
+ logger.info("Total number of partitions: " + sumPartitions);
+ double averagePartitions = getAverage(partitionsPerNode.values());
+ logger.info("Average number of partitions per node: " + averagePartitions);
+ double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
+ logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+ // Statistics about each state
+ Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+ for (String state : _states.keySet()) {
+ Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+ for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
+ Map<String, Integer> stateCounts = nodeStates.getValue();
+ if (stateCounts.containsKey(state)) {
+ nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+ } else {
+ nodeStateCounts.put(nodeStates.getKey(), 0);
+ }
+ }
+ int sumStates = getSum(nodeStateCounts.values());
+ logger.info("Total number of state " + state + ": " + sumStates);
+ double averageStates = getAverage(nodeStateCounts.values());
+ logger.info("Average number of state " + state + " per node: " + averageStates);
+ double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+ logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
+ }
+ }
+
+ /**
+ * Run a set of correctness tests, reporting success or failure
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ * @param listFields
+ * The map-list assignment generated by the rebalancer
+ */
+ public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
+ final Map<String, List<String>> listFields) {
+ final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+ boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+ assert maxConstraintMet : "Max per node constraint: FAIL";
+ logger.info("Max per node constraint: PASS");
+
+ boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+ assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+ logger.info("Only live nodes have partitions constraint: PASS");
+
+ boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+ assert stateAssignmentPossible : "State replica constraint: FAIL";
+ logger.info("State replica constraint: PASS");
+
+ boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+ assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+ logger.info("Node uniqueness per partition constraint: PASS");
+ }
+
+ private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+ for (String node : partitionsPerNode.keySet()) {
+ Integer value = partitionsPerNode.get(node);
+ if (value > _maxPerNode) {
+ logger.error("ERROR: Node " + node + " has " + value
+ + " partitions despite a maximum of " + _maxPerNode);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
+ for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
+ boolean isLive = _liveSet.contains(nodeState.getKey());
+ boolean isEmpty = nodeState.getValue() == 0;
+ if (!isLive && !isEmpty) {
+ logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
+ + nodeState.getValue() + " replicas!");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
+ for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+ final Map<String, String> nodeMap = partitionEntry.getValue();
+ final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+ for (String state : nodeMap.values()) {
+ if (!stateCounts.containsKey(state)) {
+ stateCounts.put(state, 1);
+ } else {
+ stateCounts.put(state, stateCounts.get(state) + 1);
+ }
+ }
+ for (String state : stateCounts.keySet()) {
+ if (state.equals(HelixDefinedState.DROPPED.toString())) {
+ continue;
+ }
+ int count = stateCounts.get(state);
+ int maximumCount = _states.get(state);
+ if (count > maximumCount) {
+ logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
+ + " has " + count + " replicas when " + maximumCount + " is allowed!");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
+ for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
+ Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+ int numUniques = nodeSet.size();
+ int total = partitionEntry.getValue().size();
+ if (numUniques < total) {
+ logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
+ + " nodes, but only " + numUniques + " are unique!");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private double getAverage(final Collection<Integer> values) {
+ double sum = 0.0;
+ for (Integer value : values) {
+ sum += value;
+ }
+ if (values.size() != 0) {
+ return sum / values.size();
+ } else {
+ return -1.0;
+ }
+ }
+
+ private int getSum(final Collection<Integer> values) {
+ int sum = 0;
+ for (Integer value : values) {
+ sum += value;
+ }
+ return sum;
+ }
+
+ private double getStdev(final Collection<Integer> values, double mean) {
+ double sum = 0.0;
+ for (Integer value : values) {
+ double deviation = mean - value;
+ sum += Math.pow(deviation, 2.0);
+ }
+ if (values.size() != 0) {
+ sum /= values.size();
+ return Math.pow(sum, 0.5);
+ } else {
+ return -1.0;
+ }
+ }
+
+ private Map<String, Integer> getPartitionBucketsForNode(
+ final Map<String, Map<String, String>> assignment) {
+ Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+ for (String node : _liveNodes) {
+ partitionsPerNode.put(node, 0);
+ }
+ for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+ final Map<String, String> nodeMap = partitionEntry.getValue();
+ for (String node : nodeMap.keySet()) {
+ String state = nodeMap.get(node);
+ if (state.equals(HelixDefinedState.DROPPED.toString())) {
+ continue;
+ }
+ // add 1 for every occurrence of a node
+ if (!partitionsPerNode.containsKey(node)) {
+ partitionsPerNode.put(node, 1);
+ } else {
+ partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+ }
+ }
+ }
+ return partitionsPerNode;
+ }
+
+ private Map<String, Map<String, Integer>> getStateBucketsForNode(
+ final Map<String, Map<String, String>> assignment) {
+ Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
+ for (String n : _liveNodes) {
+ result.put(n, new TreeMap<String, Integer>());
+ }
+ for (Map<String, String> nodeStateMap : assignment.values()) {
+ for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+ if (!result.containsKey(nodeState.getKey())) {
+ result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+ }
+ Map<String, Integer> stateMap = result.get(nodeState.getKey());
+ if (!stateMap.containsKey(nodeState.getValue())) {
+ stateMap.put(nodeState.getValue(), 1);
+ } else {
+ stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Randomly choose between killing, adding, or resurrecting a single node
+ * @return (Partition -> (Node -> State)) ZNRecord
+ */
+ public ZNRecord runOnceRandomly() {
+ double choose = _random.nextDouble();
+ ZNRecord result = null;
+ if (choose < P_KILL) {
+ result = removeSingleNode(null);
+ } else if (choose < P_KILL + P_ADD) {
+ result = addSingleNode(null);
+ } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+ result = resurrectSingleNode(null);
+ }
+ return result;
+ }
+
+ /**
+ * Run rebalancer trying to add a never-live node
+ * @param node
+ * Optional String to add
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord addSingleNode(String node) {
+ logger.info("=================== add node =================");
+ if (_nonLiveSet.size() == 0) {
+ logger.warn("Cannot add node because there are no nodes left to add.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_nonLiveSet.contains(node)) {
+ node = getRandomSetElement(_nonLiveSet);
+ }
+ logger.info("Adding " + node);
+ _liveNodes.add(node);
+ _liveSet.add(node);
+ _nonLiveSet.remove(node);
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+ computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ /**
+ * Run rebalancer trying to remove a live node
+ * @param node
+ * Optional String to remove
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord removeSingleNode(String node) {
+ logger.info("=================== remove node =================");
+ if (_liveSet.size() == 0) {
+ logger.warn("Cannot remove node because there are no nodes left to remove.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_liveSet.contains(node)) {
+ node = getRandomSetElement(_liveSet);
+ }
+ logger.info("Removing " + node);
+ _removedSet.add(node);
+ _liveNodes.remove(node);
+ _liveSet.remove(node);
+
+ // the rebalancer expects that the current mapping doesn't contain deleted
+ // nodes
+ for (Map<String, String> nodeMap : _currentMapping.values()) {
+ if (nodeMap.containsKey(node)) {
+ nodeMap.remove(node);
+ }
+ }
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ /**
+ * Run rebalancer trying to add back a removed node
+ * @param node
+ * Optional String to resurrect
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord resurrectSingleNode(String node) {
+ logger.info("=================== resurrect node =================");
+ if (_removedSet.size() == 0) {
+ logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_removedSet.contains(node)) {
+ node = getRandomSetElement(_removedSet);
+ }
+ logger.info("Resurrecting " + node);
+ _removedSet.remove(node);
+ _liveNodes.add(node);
+ _liveSet.add(node);
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ private <T> T getRandomSetElement(Set<T> source) {
+ int element = _random.nextInt(source.size());
+ int i = 0;
+ for (T node : source) {
+ if (i == element) {
+ return node;
+ }
+ i++;
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
+ * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
+ * current mapping, then it should distribute states as evenly as possible.
+ */
+ @Test
+ public void testOrphansNotPreferred() {
+ final String RESOURCE_NAME = "resource";
+ final String[] PARTITIONS = {
+ "resource_0", "resource_1", "resource_2"
+ };
+ final StateModelDefinition STATE_MODEL =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ final int REPLICA_COUNT = 2;
+ final String[] NODES = {
+ "n0", "n1", "n2"
+ };
+
+ // initial state, one node, no mapping
+ List<String> allNodes = Lists.newArrayList(NODES[0]);
+ List<String> liveNodes = Lists.newArrayList(NODES[0]);
+ Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+ for (String partition : PARTITIONS) {
+ currentMapping.put(partition, new HashMap<String, String>());
+ }
+
+ // make sure that when the first node joins, a single replica is assigned fairly
+ List<String> partitions = ImmutableList.copyOf(PARTITIONS);
+ LinkedHashMap<String, Integer> stateCount =
+ AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ ZNRecord znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ Map<String, List<String>> preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ // make sure these are all MASTER
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+ }
+
+ // now assign a replica to the first node in the current mapping, and add a second node
+ allNodes.add(NODES[1]);
+ liveNodes.add(NODES[1]);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[0], "MASTER");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
+ + partition);
+ Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
+ + partition);
+ }
+
+ // now set the current mapping to reflect this update and make sure that it distributes masters
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[1], "SLAVE");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ Set<String> firstNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+
+ // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
+ // new node is never the most preferred
+ allNodes.add(NODES[2]);
+ liveNodes.add(NODES[2]);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
+ currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ boolean newNodeUsed = false;
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ if (preferenceList.contains(NODES[2])) {
+ newNodeUsed = true;
+ Assert.assertEquals(preferenceList.get(1), NODES[2],
+ "newly added node not at preference list tail for " + partition);
+ }
+ }
+ Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
+
+ // now remap this to take the new node into account, should go back to balancing masters, slaves
+ // evenly across all nodes
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ Set<String> secondNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ secondNodes.add(preferenceList.get(1));
+ }
+ Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
+ Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
+
+ // remove a node now, but use the current mapping with everything balanced just prior
+ liveNodes.remove(0);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // remove all references of n0 from the mapping, keep everything else in a legal state
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Map<String, String> stateMap = currentMapping.get(partition);
+ for (String participant : stateMap.keySet()) {
+ Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
+ + partition);
+ }
+ for (String participant : preferenceList) {
+ if (!stateMap.containsKey(participant)) {
+ Assert.assertNotSame(preferenceList.get(0), participant,
+ "newly moved replica should not be master for " + partition);
+ }
+ }
+ }
+
+ // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+ }
+}