You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:41 UTC

[2/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
deleted file mode 100644
index 959609f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ /dev/null
@@ -1,753 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-
-public class AutoRebalanceStrategy implements RebalanceStrategy {
-  private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-  private final ReplicaPlacementScheme _placementScheme;
-
-  private String _resourceName;
-  private List<String> _partitions;
-  private LinkedHashMap<String, Integer> _states;
-  private int _maximumPerNode;
-
-  private Map<String, Node> _nodeMap;
-  private List<Node> _liveNodesList;
-  private Map<Integer, String> _stateMap;
-
-  private Map<Replica, Node> _preferredAssignment;
-  private Map<Replica, Node> _existingPreferredAssignment;
-  private Map<Replica, Node> _existingNonPreferredAssignment;
-  private Set<Replica> _orphaned;
-
-  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
-    init(resourceName, partitions, states, maximumPerNode);
-    _placementScheme = new DefaultPlacementScheme();
-  }
-
-  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states) {
-    this(resourceName, partitions, states, Integer.MAX_VALUE);
-  }
-
-  @Override
-  public void init(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
-    _maximumPerNode = maximumPerNode;
-  }
-
-  @Override
-  public ZNRecord computePartitionAssignment(final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
-    int numReplicas = countStateReplicas();
-    ZNRecord znRecord = new ZNRecord(_resourceName);
-    if (liveNodes.size() == 0) {
-      return znRecord;
-    }
-    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
-    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
-    _nodeMap = new HashMap<String, Node>();
-    _liveNodesList = new ArrayList<Node>();
-
-    for (String id : allNodes) {
-      Node node = new Node(id);
-      node.capacity = 0;
-      node.hasCeilingCapacity = false;
-      _nodeMap.put(id, node);
-    }
-    for (int i = 0; i < liveNodes.size(); i++) {
-      boolean usingCeiling = false;
-      int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
-      if (distRemainder > 0 && targetSize < _maximumPerNode) {
-        targetSize += 1;
-        distRemainder = distRemainder - 1;
-        usingCeiling = true;
-      }
-      Node node = _nodeMap.get(liveNodes.get(i));
-      node.isAlive = true;
-      node.capacity = targetSize;
-      node.hasCeilingCapacity = usingCeiling;
-      _liveNodesList.add(node);
-    }
-
-    // compute states for all replica ids
-    _stateMap = generateStateMap();
-
-    // compute the preferred mapping if all nodes were up
-    _preferredAssignment = computePreferredPlacement(allNodes);
-
-    // logger.info("preferred mapping:"+ preferredAssignment);
-    // from current mapping derive the ones in preferred location
-    // this will update the nodes with their current fill status
-    _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
-
-    // from current mapping derive the ones not in preferred location
-    _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
-
-    // compute orphaned replicas that are not assigned to any node
-    _orphaned = computeOrphaned();
-    if (logger.isInfoEnabled()) {
-      logger.info("orphan = " + _orphaned);
-    }
-
-    moveNonPreferredReplicasToPreferred();
-
-    assignOrphans();
-
-    moveExcessReplicas();
-
-    prepareResult(znRecord);
-    return znRecord;
-  }
-
-  /**
-   * Move replicas assigned to non-preferred nodes if their current node is at capacity
-   * and its preferred node is under capacity.
-   */
-  private void moveNonPreferredReplicasToPreferred() {
-    // iterate through non preferred and see if we can move them to the
-    // preferred location if the donor has more than it should and stealer has
-    // enough capacity
-    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<Replica, Node> entry = iterator.next();
-      Replica replica = entry.getKey();
-      Node donor = entry.getValue();
-      Node receiver = _preferredAssignment.get(replica);
-      if (donor.capacity < donor.currentlyAssigned
-          && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
-        donor.currentlyAssigned = donor.currentlyAssigned - 1;
-        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-        donor.nonPreferred.remove(replica);
-        receiver.preferred.add(replica);
-        donor.newReplicas.remove(replica);
-        receiver.newReplicas.add(replica);
-        iterator.remove();
-      }
-    }
-  }
-
-  /**
-   * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
-   */
-  private void assignOrphans() {
-    // now iterate over nodes and remaining orphaned partitions and assign
-    // partitions randomly
-    // Better to iterate over orphaned partitions first
-    Iterator<Replica> it = _orphaned.iterator();
-    while (it.hasNext()) {
-      Replica replica = it.next();
-      boolean added = false;
-      int startIndex = computeRandomStartIndex(replica);
-      for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
-        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
-          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-          receiver.nonPreferred.add(replica);
-          receiver.newReplicas.add(replica);
-          added = true;
-          break;
-        }
-      }
-      if (!added) {
-        // try adding the replica by making room for it
-        added = assignOrphanByMakingRoom(replica);
-      }
-      if (added) {
-        it.remove();
-      }
-    }
-    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
-      logger.info("could not assign nodes to partitions: " + _orphaned);
-    }
-  }
-
-  /**
-   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
-   * @param replica The replica to assign
-   * @return true if the assignment succeeded, false otherwise
-   */
-  private boolean assignOrphanByMakingRoom(Replica replica) {
-    Node capacityDonor = null;
-    Node capacityAcceptor = null;
-    int startIndex = computeRandomStartIndex(replica);
-    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-      Node current = _liveNodesList.get(index % _liveNodesList.size());
-      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
-          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
-        // this node has space but cannot accept the node
-        capacityDonor = current;
-      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
-          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
-        // this node would be able to accept the replica if it has ceiling capacity
-        capacityAcceptor = current;
-      }
-      if (capacityDonor != null && capacityAcceptor != null) {
-        break;
-      }
-    }
-    if (capacityDonor != null && capacityAcceptor != null) {
-      // transfer ceiling capacity and add the node
-      capacityAcceptor.steal(capacityDonor, replica);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Move replicas from too-full nodes to nodes that can accept the replicas
-   */
-  private void moveExcessReplicas() {
-    // iterate over nodes and move extra load
-    Iterator<Replica> it;
-    for (Node donor : _liveNodesList) {
-      if (donor.capacity < donor.currentlyAssigned) {
-        Collections.sort(donor.nonPreferred);
-        it = donor.nonPreferred.iterator();
-        while (it.hasNext()) {
-          Replica replica = it.next();
-          int startIndex = computeRandomStartIndex(replica);
-          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
-            if (receiver.canAdd(replica)) {
-              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-              receiver.nonPreferred.add(replica);
-              donor.currentlyAssigned = donor.currentlyAssigned - 1;
-              it.remove();
-              break;
-            }
-          }
-          if (donor.capacity >= donor.currentlyAssigned) {
-            break;
-          }
-        }
-        if (donor.capacity < donor.currentlyAssigned) {
-          logger.warn("Could not take partitions out of node:" + donor.id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Update a ZNRecord with the results of the rebalancing.
-   * @param znRecord
-   */
-  private void prepareResult(ZNRecord znRecord) {
-    // The map fields are keyed on partition name to a pair of node and state, i.e. it
-    // indicates that the partition with given state is served by that node
-    //
-    // The list fields are also keyed on partition and list all the nodes serving that partition.
-    // This is useful to verify that there is no node serving multiple replicas of the same
-    // partition.
-    Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
-    for (String partition : _partitions) {
-      znRecord.setMapField(partition, new TreeMap<String, String>());
-      znRecord.setListField(partition, new ArrayList<String>());
-      newPreferences.put(partition, new ArrayList<String>());
-    }
-
-    // for preference lists, the rough priority that we want is:
-    // [existing preferred, existing non-preferred, non-existing preferred, non-existing
-    // non-preferred]
-    for (Node node : _liveNodesList) {
-      for (Replica replica : node.preferred) {
-        if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
-        } else {
-          znRecord.getListField(replica.partition).add(node.id);
-        }
-      }
-    }
-    for (Node node : _liveNodesList) {
-      for (Replica replica : node.nonPreferred) {
-        if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
-        } else {
-          znRecord.getListField(replica.partition).add(node.id);
-        }
-      }
-    }
-    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
-
-    // generate preference maps based on the preference lists
-    for (String partition : _partitions) {
-      List<String> preferenceList = znRecord.getListField(partition);
-      int i = 0;
-      for (String participant : preferenceList) {
-        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
-        i++;
-      }
-    }
-  }
-
-  /**
-   * Adjust preference lists to reduce the number of same replicas on an instance. This will
-   * separately normalize two sets of preference lists, and then append the results of the second
-   * set to those of the first. This basically ensures that existing replicas are automatically
-   * preferred.
-   * @param preferenceLists map of (partition --> list of nodes)
-   * @param newPreferences map containing node preferences not consistent with the current
-   *          assignment
-   */
-  private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
-      Map<String, List<String>> newPreferences) {
-
-    Map<String, Map<String, Integer>> nodeReplicaCounts =
-        new HashMap<String, Map<String, Integer>>();
-    for (String partition : preferenceLists.keySet()) {
-      normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
-    }
-    for (String partition : newPreferences.keySet()) {
-      normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
-      preferenceLists.get(partition).addAll(newPreferences.get(partition));
-    }
-  }
-
-  /**
-   * Adjust a single preference list for replica assignment imbalance
-   * @param preferenceList list of node names
-   * @param nodeReplicaCounts map of (node --> state --> count)
-   */
-  private void normalizePreferenceList(List<String> preferenceList,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    List<String> newPreferenceList = new ArrayList<String>();
-    int replicas = Math.min(countStateReplicas(), preferenceList.size());
-
-    // make this a LinkedHashSet to preserve iteration order
-    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
-    for (int i = 0; i < replicas; i++) {
-      String state = _stateMap.get(i);
-      String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
-      newPreferenceList.add(node);
-      notAssigned.remove(node);
-      Map<String, Integer> counts = nodeReplicaCounts.get(node);
-      counts.put(state, counts.get(state) + 1);
-    }
-    preferenceList.clear();
-    preferenceList.addAll(newPreferenceList);
-  }
-
-  /**
-   * Get the node which hosts the fewest of a given replica
-   * @param state the state
-   * @param nodes nodes to check
-   * @param nodeReplicaCounts current assignment of replicas
-   * @return the node most willing to accept the replica
-   */
-  private String getMinimumNodeForReplica(String state, Set<String> nodes,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    String minimalNode = null;
-    int minimalCount = Integer.MAX_VALUE;
-    for (String node : nodes) {
-      int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
-      if (count < minimalCount) {
-        minimalCount = count;
-        minimalNode = node;
-      }
-    }
-    return minimalNode;
-  }
-
-  /**
-   * Safe check for the number of replicas of a given id assiged to a node
-   * @param state the state to assign
-   * @param node the node to check
-   * @param nodeReplicaCounts a map of node to replica id and counts
-   * @return the number of currently assigned replicas of the given id
-   */
-  private int getReplicaCountForNode(String state, String node,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    if (!nodeReplicaCounts.containsKey(node)) {
-      Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
-      replicaCounts.put(state, 0);
-      nodeReplicaCounts.put(node, replicaCounts);
-      return 0;
-    }
-    Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
-    if (!replicaCounts.containsKey(state)) {
-      replicaCounts.put(state, 0);
-      return 0;
-    }
-    return replicaCounts.get(state);
-  }
-
-  /**
-   * Compute the subset of the current mapping where replicas are not mapped according to their
-   * preferred assignment.
-   * @param currentMapping Current mapping of replicas to nodes
-   * @return The current assignments that do not conform to the preferred assignment
-   */
-  private Map<Replica, Node> computeExistingNonPreferredPlacement(
-      Map<String, Map<String, String>> currentMapping) {
-    Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
-    int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
-      for (String nodeId : nodeStateMap.keySet()) {
-        Node node = _nodeMap.get(nodeId);
-        boolean skip = false;
-        for (Replica replica : node.preferred) {
-          if (replica.partition.equals(partition)) {
-            skip = true;
-            break;
-          }
-        }
-        if (skip) {
-          continue;
-        }
-        // check if its in one of the preferred position
-        for (int replicaId = 0; replicaId < count; replicaId++) {
-          Replica replica = new Replica(partition, replicaId);
-          if (!_preferredAssignment.containsKey(replica)) {
-
-            logger.info("partitions: " + _partitions);
-            logger.info("currentMapping.keySet: " + currentMapping.keySet());
-            throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
-          }
-
-          if (_preferredAssignment.get(replica).id != node.id
-              && !_existingPreferredAssignment.containsKey(replica)
-              && !existingNonPreferredAssignment.containsKey(replica)) {
-            existingNonPreferredAssignment.put(replica, node);
-            node.nonPreferred.add(replica);
-
-            break;
-          }
-        }
-      }
-    }
-    return existingNonPreferredAssignment;
-  }
-
-  /**
-   * Get a live node index to try first for a replica so that each possible start index is
-   * roughly uniformly assigned.
-   * @param replica The replica to assign
-   * @return The starting node index to try
-   */
-  private int computeRandomStartIndex(final Replica replica) {
-    return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
-  }
-
-  /**
-   * Get a set of replicas not currently assigned to any node
-   * @return Unassigned replicas
-   */
-  private Set<Replica> computeOrphaned() {
-    Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
-    for (Replica r : _existingPreferredAssignment.keySet()) {
-      if (orphanedPartitions.contains(r)) {
-        orphanedPartitions.remove(r);
-      }
-    }
-    for (Replica r : _existingNonPreferredAssignment.keySet()) {
-      if (orphanedPartitions.contains(r)) {
-        orphanedPartitions.remove(r);
-      }
-    }
-
-    return orphanedPartitions;
-  }
-
-  /**
-   * Determine the replicas already assigned to their preferred nodes
-   * @param currentMapping Current assignment of replicas to nodes
-   * @return Assignments that conform to the preferred placement
-   */
-  private Map<Replica, Node> computeExistingPreferredPlacement(
-      final Map<String, Map<String, String>> currentMapping) {
-    Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
-    int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
-      for (String nodeId : nodeStateMap.keySet()) {
-        Node node = _nodeMap.get(nodeId);
-        node.currentlyAssigned = node.currentlyAssigned + 1;
-        // check if its in one of the preferred position
-        for (int replicaId = 0; replicaId < count; replicaId++) {
-          Replica replica = new Replica(partition, replicaId);
-          if (_preferredAssignment.containsKey(replica)
-              && !existingPreferredAssignment.containsKey(replica)
-              && _preferredAssignment.get(replica).id == node.id) {
-            existingPreferredAssignment.put(replica, node);
-            node.preferred.add(replica);
-            break;
-          }
-        }
-      }
-    }
-
-    return existingPreferredAssignment;
-  }
-
-  /**
-   * Given a predefined set of all possible nodes, compute an assignment of replicas to
-   * nodes that evenly assigns all replicas to nodes.
-   * @param allNodes Identifiers to all nodes, live and non-live
-   * @return Preferred assignment of replicas
-   */
-  private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
-    Map<Replica, Node> preferredMapping;
-    preferredMapping = new HashMap<Replica, Node>();
-    int partitionId = 0;
-    int numReplicas = countStateReplicas();
-    int count = countStateReplicas();
-    for (String partition : _partitions) {
-      for (int replicaId = 0; replicaId < count; replicaId++) {
-        Replica replica = new Replica(partition, replicaId);
-        String nodeName =
-            _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
-                allNodes);
-        preferredMapping.put(replica, _nodeMap.get(nodeName));
-      }
-      partitionId = partitionId + 1;
-    }
-    return preferredMapping;
-  }
-
-  /**
-   * Counts the total number of replicas given a state-count mapping
-   * @return
-   */
-  private int countStateReplicas() {
-    int total = 0;
-    for (Integer count : _states.values()) {
-      total += count;
-    }
-    return total;
-  }
-
-  /**
-   * Compute a map of replica ids to state names
-   * @return Map: replica id -> state name
-   */
-  private Map<Integer, String> generateStateMap() {
-    int replicaId = 0;
-    Map<Integer, String> stateMap = new HashMap<Integer, String>();
-    for (String state : _states.keySet()) {
-      Integer count = _states.get(state);
-      for (int i = 0; i < count; i++) {
-        stateMap.put(replicaId, state);
-        replicaId++;
-      }
-    }
-    return stateMap;
-  }
-
-  /**
-   * A Node is an entity that can serve replicas. It has a capacity and knowledge
-   * of replicas assigned to it, so it can decide if it can receive additional replicas.
-   */
-  class Node {
-    public int currentlyAssigned;
-    public int capacity;
-    public boolean hasCeilingCapacity;
-    private final String id;
-    boolean isAlive;
-    private final List<Replica> preferred;
-    private final List<Replica> nonPreferred;
-    private final Set<Replica> newReplicas;
-
-    public Node(String id) {
-      preferred = new ArrayList<Replica>();
-      nonPreferred = new ArrayList<Replica>();
-      newReplicas = new TreeSet<Replica>();
-      currentlyAssigned = 0;
-      isAlive = false;
-      this.id = id;
-    }
-
-    /**
-     * Check if this replica can be legally added to this node
-     * @param replica The replica to test
-     * @return true if the assignment can be made, false otherwise
-     */
-    public boolean canAdd(Replica replica) {
-      if (currentlyAssigned >= capacity) {
-        return false;
-      }
-      return canAddIfCapacity(replica);
-    }
-
-    /**
-     * Check if this replica can be legally added to this node, provided that it has enough
-     * capacity.
-     * @param replica The replica to test
-     * @return true if the assignment can be made, false otherwise
-     */
-    public boolean canAddIfCapacity(Replica replica) {
-      if (!isAlive) {
-        return false;
-      }
-      for (Replica r : preferred) {
-        if (r.partition.equals(replica.partition)) {
-          return false;
-        }
-      }
-      for (Replica r : nonPreferred) {
-        if (r.partition.equals(replica.partition)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    /**
-     * Receive a replica by stealing capacity from another Node
-     * @param donor The node that has excess capacity
-     * @param replica The replica to receive
-     */
-    public void steal(Node donor, Replica replica) {
-      donor.hasCeilingCapacity = false;
-      donor.capacity--;
-      hasCeilingCapacity = true;
-      capacity++;
-      currentlyAssigned++;
-      nonPreferred.add(replica);
-      newReplicas.add(replica);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
-          .append("\nnonpreferred:").append(nonPreferred.size());
-      return sb.toString();
-    }
-  }
-
-  /**
-   * A Replica is a combination of a partition of the resource, the state the replica is in
-   * and an identifier signifying a specific replica of a given partition and state.
-   */
-  class Replica implements Comparable<Replica> {
-    private String partition;
-    private int replicaId; // this is a partition-relative id
-    private String format;
-
-    public Replica(String partition, int replicaId) {
-      this.partition = partition;
-      this.replicaId = replicaId;
-      this.format = this.partition + "|" + this.replicaId;
-    }
-
-    @Override
-    public String toString() {
-      return format;
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      if (that instanceof Replica) {
-        return this.format.equals(((Replica) that).format);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return this.format.hashCode();
-    }
-
-    @Override
-    public int compareTo(Replica that) {
-      if (that instanceof Replica) {
-        return this.format.compareTo(that.format);
-      }
-      return -1;
-    }
-  }
-
-  /**
-   * Interface for providing a custom approach to computing a replica's affinity to a node.
-   */
-  public interface ReplicaPlacementScheme {
-    /**
-     * Initialize global state
-     * @param manager The instance to which this placement is associated
-     */
-    public void init(final HelixManager manager);
-
-    /**
-     * Given properties of this replica, determine the node it would prefer to be served by
-     * @param partitionId The current partition
-     * @param replicaId The current replica with respect to the current partition
-     * @param numPartitions The total number of partitions
-     * @param numReplicas The total number of replicas per partition
-     * @param nodeNames A list of identifiers of all nodes, live and non-live
-     * @return The name of the node that would prefer to serve this replica
-     */
-    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
-        final List<String> nodeNames);
-  }
-
-  /**
-   * Compute preferred placements based on a default strategy that assigns replicas to nodes as
-   * evenly as possible while avoiding placing two replicas of the same partition on any node.
-   */
-  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
-    @Override
-    public void init(final HelixManager manager) {
-      // do nothing since this is independent of the manager
-    }
-
-    @Override
-    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
-        final List<String> nodeNames) {
-      int index;
-      if (nodeNames.size() > numPartitions) {
-        // assign replicas in partition order in case there are more nodes than partitions
-        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
-      } else if (nodeNames.size() == numPartitions) {
-        // need a replica offset in case the sizes of these sets are the same
-        index =
-            ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
-                % nodeNames.size();
-      } else {
-        // in all other cases, assigning a replica at a time for each partition is reasonable
-        index = (partitionId + replicaId) % nodeNames.size();
-      }
-      return nodeNames.get(index);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
deleted file mode 100644
index 4daae82..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ZNRecord;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Assignment strategy interface that computes the assignment of partition->instance.
- */
-public interface RebalanceStrategy {
-  /**
-   * Perform the necessary initialization for the rebalance strategy object.
-   * @param resourceName
-   * @param partitions
-   * @param states
-   * @param maximumPerNode
-   */
-  void init(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode);
-
-  /**
-   * Compute the preference lists and (optional partition-state mapping) for the given resource.
-   *
-   * @param liveNodes
-   * @param currentMapping
-   * @param allNodes
-   * @return
-   */
-  ZNRecord computePartitionAssignment(final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index e97ac9b..73f2cbb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
@@ -607,6 +608,13 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void addResource(String clusterName, String resourceName, int partitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+        rebalanceStrategy, 0, -1);
+  }
+
+  @Override
   public void addResource(String clusterName, String resourceName, IdealState idealstate) {
     String stateModelRef = idealstate.getStateModelDefRef();
     String stateModelDefPath =
@@ -629,14 +637,21 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void addResource(String clusterName, String resourceName, int partitions,
       String stateModelRef, String rebalancerMode, int bucketSize) {
-    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize,
-        -1);
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1);
 
   }
 
   @Override
   public void addResource(String clusterName, String resourceName, int partitions,
       String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) {
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+        RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance);
+  }
+
+  @Override
+  public void addResource(String clusterName, String resourceName, int partitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+      int maxPartitionsPerInstance) {
     if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
       throw new HelixException("cluster " + clusterName + " is not setup yet");
     }
@@ -647,6 +662,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     RebalanceMode mode =
         idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO);
     idealState.setRebalanceMode(mode);
+    idealState.setRebalanceStrategy(rebalanceStrategy);
     idealState.setReplicas("" + 0);
     idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
     if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
@@ -1014,8 +1030,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       @Override
       public ZNRecord update(ZNRecord currentData) {
         ClusterConstraints constraints =
-            currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
-                currentData);
+            currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();
@@ -1153,6 +1168,26 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+      throw new HelixException("cluster " + clusterName + " instance " + instanceName
+          + " is not setup yet");
+    }
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    PropertyKey configKey = keyBuilder.instanceConfig(instanceName);
+    InstanceConfig config = accessor.getProperty(configKey);
+    config.setZoneId(zoneId);
+    accessor.setProperty(configKey, config);
+  }
+
+  @Override
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
new file mode 100644
index 0000000..25a16d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -0,0 +1,92 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+/**
+ * Cluster configurations
+ */
+public class ClusterConfig extends HelixProperty {
+  /**
+   * Configurable characteristics of a cluster
+   */
+  public enum ClusterConfigProperty {
+    HELIX_DISABLE_PIPELINE_TRIGGERS,
+    TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
+    FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+  }
+
+  /**
+   * Instantiate for a specific cluster
+   *
+   * @param cluster the cluster identifier
+   */
+  public ClusterConfig(String cluster) {
+    super(cluster);
+  }
+
+  /**
+   * Instantiate with a pre-populated record
+   *
+   * @param record a ZNRecord corresponding to a cluster configuration
+   */
+  public ClusterConfig(ZNRecord record) {
+    super(record);
+  }
+
+  /**
+   * Whether to persist best possible assignment in a resource's idealstate.
+   *
+   * @return
+   */
+  public Boolean isPipelineTriggersDisabled() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ClusterConfig) {
+      ClusterConfig that = (ClusterConfig) obj;
+
+      if (this.getId().equals(that.getId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+
+  /**
+   * Get the name of this resource
+   *
+   * @return the instance name
+   */
+  public String getClusterName() {
+    return _record.getId();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7c4cf54..55d4734 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -167,7 +167,7 @@ public class IdealState extends HelixProperty {
 
   /**
    * Specify the strategy for Helix to use to compute the partition-instance assignment,
-   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy}
    *
    * @param rebalanceStrategy
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index eb1c652..ecf2900 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty {
   public enum InstanceConfigProperty {
     HELIX_HOST,
     HELIX_PORT,
+    HELIX_ZONE_ID,
     HELIX_ENABLED,
     HELIX_DISABLED_PARTITION,
-    TAG_LIST
+    TAG_LIST,
+    INSTANCE_WEIGHT,
+    DOMAIN
   }
+  public static final int WEIGHT_NOT_SET = -1;
 
   private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
 
@@ -94,6 +98,50 @@ public class InstanceConfig extends HelixProperty {
     _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
   }
 
+  public String getZoneId() {
+    return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name());
+  }
+
+  public void setZoneId(String zoneId) {
+    _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId);
+  }
+
+  /**
+   * Domain represents a hierarchy identifier for an instance.
+   * @return
+   */
+  public String getDomain() {
+    return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name());
+  }
+
+  /**
+   * Domain represents a hierarchy identifier for an instance.
+   * Example:  "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
+   * @return
+   */
+  public void setDomain(String domain) {
+    _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain);
+  }
+
+  public int getWeight() {
+    String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name());
+    if (w != null) {
+      try {
+        int weight = Integer.valueOf(w);
+        return weight;
+      } catch (NumberFormatException e) {
+      }
+    }
+    return WEIGHT_NOT_SET;
+  }
+
+  public void setWeight(int weight) {
+    if (weight <= 0) {
+      throw new IllegalArgumentException("Instance weight can not be equal or less than 0!");
+    }
+    _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight));
+  }
+
   /**
    * Get arbitrary tags associated with the instance
    * @return a list of tags

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 623357f..ac96768 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -32,8 +32,8 @@ import java.util.TreeSet;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
@@ -126,7 +126,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     List<String> allNodes =
         Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
     Collections.sort(allNodes);
-    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+    ZNRecord record =
+        strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
     Map<String, List<String>> preferenceLists = record.getListFields();
 
     // Convert to an assignment keyed on participant

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 9d411bb..08ccbdc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -362,6 +362,12 @@ public class ClusterSetup {
   }
 
   public void addResourceToCluster(String clusterName, String resourceName, int numResources,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+    _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
+        rebalanceStrategy);
+  }
+
+  public void addResourceToCluster(String clusterName, String resourceName, int numResources,
       String stateModelRef, String rebalancerMode, int bucketSize) {
     _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
         bucketSize);

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
new file mode 100644
index 0000000..a4e38a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
@@ -0,0 +1,766 @@
+package org.apache.helix.controller.Strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestAutoRebalanceStrategy {
+  private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
+
+  /**
+   * Sanity test for a basic Master-Slave model
+   */
+  @Test
+  public void simpleMasterSlaveTest() {
+    final int NUM_ITERATIONS = 10;
+    final int NUM_PARTITIONS = 10;
+    final int NUM_LIVE_NODES = 12;
+    final int NUM_TOTAL_NODES = 20;
+    final int MAX_PER_NODE = 5;
+
+    final String[] STATE_NAMES = {
+        "MASTER", "SLAVE"
+    };
+    final int[] STATE_COUNTS = {
+        1, 2
+    };
+
+    runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
+        MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
+  }
+
+  /**
+   * Run a test for an arbitrary state model.
+   * @param name Name of the test state model
+   * @param numIterations Number of rebalance tasks to run
+   * @param numPartitions Number of partitions for the resource
+   * @param numLiveNodes Number of live nodes in the cluster
+   * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
+   *          numLiveNodes
+   * @param maxPerNode Maximum number of replicas a node can serve
+   * @param stateNames States ordered by preference
+   * @param stateCounts Number of replicas that should be in each state
+   */
+  private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
+      int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
+    List<String> partitions = new ArrayList<String>();
+    for (int i = 0; i < numPartitions; i++) {
+      partitions.add("p_" + i);
+    }
+
+    List<String> liveNodes = new ArrayList<String>();
+    List<String> allNodes = new ArrayList<String>();
+    for (int i = 0; i < numTotalNodes; i++) {
+      allNodes.add("n_" + i);
+      if (i < numLiveNodes) {
+        liveNodes.add("n_" + i);
+      }
+    }
+
+    Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
+
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
+      states.put(stateNames[i], stateCounts[i]);
+    }
+
+    StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
+
+    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
+        stateModelDef).runRepeatedly(numIterations);
+  }
+
+  /**
+   * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
+   * into account when computing mappings, so this is acceptable.
+   * @param modelName name to give the model
+   * @param initialState initial state for all nodes
+   * @param states ordered map of state to count
+   * @return incomplete StateModelDefinition for rebalancing
+   */
+  private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
+      LinkedHashMap<String, Integer> states) {
+    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
+    builder.initialState(initialState);
+    int i = states.size();
+    for (String state : states.keySet()) {
+      builder.addState(state, i);
+      builder.upperBound(state, states.get(state));
+      i--;
+    }
+    return builder.build();
+  }
+
+  class AutoRebalanceTester {
+    private static final double P_KILL = 0.45;
+    private static final double P_ADD = 0.1;
+    private static final double P_RESURRECT = 0.45;
+    private static final String RESOURCE_NAME = "resource";
+
+    private List<String> _partitions;
+    private LinkedHashMap<String, Integer> _states;
+    private List<String> _liveNodes;
+    private Set<String> _liveSet;
+    private Set<String> _removedSet;
+    private Set<String> _nonLiveSet;
+    private Map<String, Map<String, String>> _currentMapping;
+    private List<String> _allNodes;
+    private int _maxPerNode;
+    private StateModelDefinition _stateModelDef;
+    private Random _random;
+
+    public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
+        List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
+        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
+      _partitions = partitions;
+      _states = states;
+      _liveNodes = liveNodes;
+      _liveSet = new TreeSet<String>();
+      for (String node : _liveNodes) {
+        _liveSet.add(node);
+      }
+      _removedSet = new TreeSet<String>();
+      _nonLiveSet = new TreeSet<String>();
+      _currentMapping = currentMapping;
+      _allNodes = allNodes;
+      for (String node : allNodes) {
+        if (!_liveSet.contains(node)) {
+          _nonLiveSet.add(node);
+        }
+      }
+      _maxPerNode = maxPerNode;
+      _stateModelDef = stateModelDef;
+      _random = new Random();
+    }
+
+    /**
+     * Repeatedly randomly select a task to run and report the result
+     * @param numIterations
+     *          Number of random tasks to run in sequence
+     */
+    public void runRepeatedly(int numIterations) {
+      logger.info("~~~~ Initial State ~~~~~");
+      RebalanceStrategy strategy =
+          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
+      ZNRecord initialResult =
+          strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+      _currentMapping = getMapping(initialResult.getListFields());
+      logger.info(_currentMapping);
+      getRunResult(_currentMapping, initialResult.getListFields());
+      for (int i = 0; i < numIterations; i++) {
+        logger.info("~~~~ Iteration " + i + " ~~~~~");
+        ZNRecord znRecord = runOnceRandomly();
+        if (znRecord != null) {
+          final Map<String, List<String>> listResult = znRecord.getListFields();
+          final Map<String, Map<String, String>> mapResult = getMapping(listResult);
+          logger.info(mapResult);
+          logger.info(listResult);
+          getRunResult(mapResult, listResult);
+          _currentMapping = mapResult;
+        }
+      }
+    }
+
+    private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
+      final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
+      ClusterDataCache cache = new ClusterDataCache();
+      MockAccessor accessor = new MockAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      for (String node : _liveNodes) {
+        LiveInstance liveInstance = new LiveInstance(node);
+        liveInstance.setSessionId("testSession");
+        accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
+      }
+      cache.refresh(accessor);
+      for (String partition : _partitions) {
+        List<String> preferenceList = listResult.get(partition);
+        Map<String, String> currentStateMap = _currentMapping.get(partition);
+        Set<String> disabled = Collections.emptySet();
+        Map<String, String> assignment =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
+                preferenceList, currentStateMap, disabled, true);
+        mapResult.put(partition, assignment);
+      }
+      return mapResult;
+    }
+
+    /**
+     * Output various statistics and correctness check results
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void getRunResult(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      logger.info("***** Statistics *****");
+      dumpStatistics(mapFields);
+      verifyCorrectness(mapFields, listFields);
+    }
+
+    /**
+     * Output statistics about the assignment
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     */
+    public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
+      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      int nodeCount = _liveNodes.size();
+      logger.info("Total number of nodes: " + nodeCount);
+      logger.info("Nodes: " + _liveNodes);
+      int sumPartitions = getSum(partitionsPerNode.values());
+      logger.info("Total number of partitions: " + sumPartitions);
+      double averagePartitions = getAverage(partitionsPerNode.values());
+      logger.info("Average number of partitions per node: " + averagePartitions);
+      double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
+      logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+      // Statistics about each state
+      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+      for (String state : _states.keySet()) {
+        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
+          Map<String, Integer> stateCounts = nodeStates.getValue();
+          if (stateCounts.containsKey(state)) {
+            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+          } else {
+            nodeStateCounts.put(nodeStates.getKey(), 0);
+          }
+        }
+        int sumStates = getSum(nodeStateCounts.values());
+        logger.info("Total number of state " + state + ": " + sumStates);
+        double averageStates = getAverage(nodeStateCounts.values());
+        logger.info("Average number of state " + state + " per node: " + averageStates);
+        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+        logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
+      }
+    }
+
+    /**
+     * Run a set of correctness tests, reporting success or failure
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+      assert maxConstraintMet : "Max per node constraint: FAIL";
+      logger.info("Max per node constraint: PASS");
+
+      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+      logger.info("Only live nodes have partitions constraint: PASS");
+
+      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+      assert stateAssignmentPossible : "State replica constraint: FAIL";
+      logger.info("State replica constraint: PASS");
+
+      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+      logger.info("Node uniqueness per partition constraint: PASS");
+    }
+
+    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+      for (String node : partitionsPerNode.keySet()) {
+        Integer value = partitionsPerNode.get(node);
+        if (value > _maxPerNode) {
+          logger.error("ERROR: Node " + node + " has " + value
+              + " partitions despite a maximum of " + _maxPerNode);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
+      for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
+        boolean isLive = _liveSet.contains(nodeState.getKey());
+        boolean isEmpty = nodeState.getValue() == 0;
+        if (!isLive && !isEmpty) {
+          logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
+              + nodeState.getValue() + " replicas!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
+      for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+        for (String state : nodeMap.values()) {
+          if (!stateCounts.containsKey(state)) {
+            stateCounts.put(state, 1);
+          } else {
+            stateCounts.put(state, stateCounts.get(state) + 1);
+          }
+        }
+        for (String state : stateCounts.keySet()) {
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          int count = stateCounts.get(state);
+          int maximumCount = _states.get(state);
+          if (count > maximumCount) {
+            logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
+                + " has " + count + " replicas when " + maximumCount + " is allowed!");
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
+      for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
+        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+        int numUniques = nodeSet.size();
+        int total = partitionEntry.getValue().size();
+        if (numUniques < total) {
+          logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
+              + " nodes, but only " + numUniques + " are unique!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private double getAverage(final Collection<Integer> values) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      if (values.size() != 0) {
+        return sum / values.size();
+      } else {
+        return -1.0;
+      }
+    }
+
+    private int getSum(final Collection<Integer> values) {
+      int sum = 0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      return sum;
+    }
+
+    private double getStdev(final Collection<Integer> values, double mean) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        double deviation = mean - value;
+        sum += Math.pow(deviation, 2.0);
+      }
+      if (values.size() != 0) {
+        sum /= values.size();
+        return Math.pow(sum, 0.5);
+      } else {
+        return -1.0;
+      }
+    }
+
+    private Map<String, Integer> getPartitionBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+      for (String node : _liveNodes) {
+        partitionsPerNode.put(node, 0);
+      }
+      for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        for (String node : nodeMap.keySet()) {
+          String state = nodeMap.get(node);
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          // add 1 for every occurrence of a node
+          if (!partitionsPerNode.containsKey(node)) {
+            partitionsPerNode.put(node, 1);
+          } else {
+            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+          }
+        }
+      }
+      return partitionsPerNode;
+    }
+
+    private Map<String, Map<String, Integer>> getStateBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
+      for (String n : _liveNodes) {
+        result.put(n, new TreeMap<String, Integer>());
+      }
+      for (Map<String, String> nodeStateMap : assignment.values()) {
+        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+          if (!result.containsKey(nodeState.getKey())) {
+            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+          }
+          Map<String, Integer> stateMap = result.get(nodeState.getKey());
+          if (!stateMap.containsKey(nodeState.getValue())) {
+            stateMap.put(nodeState.getValue(), 1);
+          } else {
+            stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Randomly choose between killing, adding, or resurrecting a single node
+     * @return (Partition -> (Node -> State)) ZNRecord
+     */
+    public ZNRecord runOnceRandomly() {
+      double choose = _random.nextDouble();
+      ZNRecord result = null;
+      if (choose < P_KILL) {
+        result = removeSingleNode(null);
+      } else if (choose < P_KILL + P_ADD) {
+        result = addSingleNode(null);
+      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+        result = resurrectSingleNode(null);
+      }
+      return result;
+    }
+
+    /**
+     * Run rebalancer trying to add a never-live node
+     * @param node
+     *          Optional String to add
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord addSingleNode(String node) {
+      logger.info("=================== add node =================");
+      if (_nonLiveSet.size() == 0) {
+        logger.warn("Cannot add node because there are no nodes left to add.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_nonLiveSet.contains(node)) {
+        node = getRandomSetElement(_nonLiveSet);
+      }
+      logger.info("Adding " + node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+      _nonLiveSet.remove(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+          computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    /**
+     * Run rebalancer trying to remove a live node
+     * @param node
+     *          Optional String to remove
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord removeSingleNode(String node) {
+      logger.info("=================== remove node =================");
+      if (_liveSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to remove.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_liveSet.contains(node)) {
+        node = getRandomSetElement(_liveSet);
+      }
+      logger.info("Removing " + node);
+      _removedSet.add(node);
+      _liveNodes.remove(node);
+      _liveSet.remove(node);
+
+      // the rebalancer expects that the current mapping doesn't contain deleted
+      // nodes
+      for (Map<String, String> nodeMap : _currentMapping.values()) {
+        if (nodeMap.containsKey(node)) {
+          nodeMap.remove(node);
+        }
+      }
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    /**
+     * Run rebalancer trying to add back a removed node
+     * @param node
+     *          Optional String to resurrect
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord resurrectSingleNode(String node) {
+      logger.info("=================== resurrect node =================");
+      if (_removedSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_removedSet.contains(node)) {
+        node = getRandomSetElement(_removedSet);
+      }
+      logger.info("Resurrecting " + node);
+      _removedSet.remove(node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    private <T> T getRandomSetElement(Set<T> source) {
+      int element = _random.nextInt(source.size());
+      int i = 0;
+      for (T node : source) {
+        if (i == element) {
+          return node;
+        }
+        i++;
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
+   * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
+   * current mapping, then it should distribute states as evenly as possible.
+   */
+  @Test
+  public void testOrphansNotPreferred() {
+    final String RESOURCE_NAME = "resource";
+    final String[] PARTITIONS = {
+        "resource_0", "resource_1", "resource_2"
+    };
+    final StateModelDefinition STATE_MODEL =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    final int REPLICA_COUNT = 2;
+    final String[] NODES = {
+        "n0", "n1", "n2"
+    };
+
+    // initial state, one node, no mapping
+    List<String> allNodes = Lists.newArrayList(NODES[0]);
+    List<String> liveNodes = Lists.newArrayList(NODES[0]);
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    for (String partition : PARTITIONS) {
+      currentMapping.put(partition, new HashMap<String, String>());
+    }
+
+    // make sure that when the first node joins, a single replica is assigned fairly
+    List<String> partitions = ImmutableList.copyOf(PARTITIONS);
+    LinkedHashMap<String, Integer> stateCount =
+        AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    ZNRecord znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    Map<String, List<String>> preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      // make sure these are all MASTER
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    }
+
+    // now assign a replica to the first node in the current mapping, and add a second node
+    allNodes.add(NODES[1]);
+    liveNodes.add(NODES[1]);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).put(NODES[0], "MASTER");
+    }
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
+          + partition);
+      Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
+          + partition);
+    }
+
+    // now set the current mapping to reflect this update and make sure that it distributes masters
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).put(NODES[1], "SLAVE");
+    }
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    Set<String> firstNodes = Sets.newHashSet();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+    }
+    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+
+    // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
+    // new node is never the most preferred
+    allNodes.add(NODES[2]);
+    liveNodes.add(NODES[2]);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+    // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
+    currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    boolean newNodeUsed = false;
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      if (preferenceList.contains(NODES[2])) {
+        newNodeUsed = true;
+        Assert.assertEquals(preferenceList.get(1), NODES[2],
+            "newly added node not at preference list tail for " + partition);
+      }
+    }
+    Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
+
+    // now remap this to take the new node into account, should go back to balancing masters, slaves
+    // evenly across all nodes
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    firstNodes.clear();
+    Set<String> secondNodes = Sets.newHashSet();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+      secondNodes.add(preferenceList.get(1));
+    }
+    Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
+    Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
+
+    // remove a node now, but use the current mapping with everything balanced just prior
+    liveNodes.remove(0);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+    // remove all references of n0 from the mapping, keep everything else in a legal state
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      Map<String, String> stateMap = currentMapping.get(partition);
+      for (String participant : stateMap.keySet()) {
+        Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
+            + partition);
+      }
+      for (String participant : preferenceList) {
+        if (!stateMap.containsKey(participant)) {
+          Assert.assertNotSame(preferenceList.get(0), participant,
+              "newly moved replica should not be master for " + partition);
+        }
+      }
+    }
+
+    // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
+    currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    firstNodes.clear();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+    }
+    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+  }
+}