You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:40 UTC

[1/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x f5ac8f8b9 -> 7147ec874


http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
deleted file mode 100644
index adc92d6..0000000
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ /dev/null
@@ -1,765 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.Mocks.MockAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class TestAutoRebalanceStrategy {
-  private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
-
-  /**
-   * Sanity test for a basic Master-Slave model
-   */
-  @Test
-  public void simpleMasterSlaveTest() {
-    final int NUM_ITERATIONS = 10;
-    final int NUM_PARTITIONS = 10;
-    final int NUM_LIVE_NODES = 12;
-    final int NUM_TOTAL_NODES = 20;
-    final int MAX_PER_NODE = 5;
-
-    final String[] STATE_NAMES = {
-        "MASTER", "SLAVE"
-    };
-    final int[] STATE_COUNTS = {
-        1, 2
-    };
-
-    runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
-        MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
-  }
-
-  /**
-   * Run a test for an arbitrary state model.
-   * @param name Name of the test state model
-   * @param numIterations Number of rebalance tasks to run
-   * @param numPartitions Number of partitions for the resource
-   * @param numLiveNodes Number of live nodes in the cluster
-   * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
-   *          numLiveNodes
-   * @param maxPerNode Maximum number of replicas a node can serve
-   * @param stateNames States ordered by preference
-   * @param stateCounts Number of replicas that should be in each state
-   */
-  private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
-      int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
-    List<String> partitions = new ArrayList<String>();
-    for (int i = 0; i < numPartitions; i++) {
-      partitions.add("p_" + i);
-    }
-
-    List<String> liveNodes = new ArrayList<String>();
-    List<String> allNodes = new ArrayList<String>();
-    for (int i = 0; i < numTotalNodes; i++) {
-      allNodes.add("n_" + i);
-      if (i < numLiveNodes) {
-        liveNodes.add("n_" + i);
-      }
-    }
-
-    Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
-
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
-    for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
-      states.put(stateNames[i], stateCounts[i]);
-    }
-
-    StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
-
-    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
-        stateModelDef).runRepeatedly(numIterations);
-  }
-
-  /**
-   * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
-   * into account when computing mappings, so this is acceptable.
-   * @param modelName name to give the model
-   * @param initialState initial state for all nodes
-   * @param states ordered map of state to count
-   * @return incomplete StateModelDefinition for rebalancing
-   */
-  private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
-      LinkedHashMap<String, Integer> states) {
-    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
-    builder.initialState(initialState);
-    int i = states.size();
-    for (String state : states.keySet()) {
-      builder.addState(state, i);
-      builder.upperBound(state, states.get(state));
-      i--;
-    }
-    return builder.build();
-  }
-
-  class AutoRebalanceTester {
-    private static final double P_KILL = 0.45;
-    private static final double P_ADD = 0.1;
-    private static final double P_RESURRECT = 0.45;
-    private static final String RESOURCE_NAME = "resource";
-
-    private List<String> _partitions;
-    private LinkedHashMap<String, Integer> _states;
-    private List<String> _liveNodes;
-    private Set<String> _liveSet;
-    private Set<String> _removedSet;
-    private Set<String> _nonLiveSet;
-    private Map<String, Map<String, String>> _currentMapping;
-    private List<String> _allNodes;
-    private int _maxPerNode;
-    private StateModelDefinition _stateModelDef;
-    private Random _random;
-
-    public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
-        List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
-        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
-      _partitions = partitions;
-      _states = states;
-      _liveNodes = liveNodes;
-      _liveSet = new TreeSet<String>();
-      for (String node : _liveNodes) {
-        _liveSet.add(node);
-      }
-      _removedSet = new TreeSet<String>();
-      _nonLiveSet = new TreeSet<String>();
-      _currentMapping = currentMapping;
-      _allNodes = allNodes;
-      for (String node : allNodes) {
-        if (!_liveSet.contains(node)) {
-          _nonLiveSet.add(node);
-        }
-      }
-      _maxPerNode = maxPerNode;
-      _stateModelDef = stateModelDef;
-      _random = new Random();
-    }
-
-    /**
-     * Repeatedly randomly select a task to run and report the result
-     * @param numIterations
-     *          Number of random tasks to run in sequence
-     */
-    public void runRepeatedly(int numIterations) {
-      logger.info("~~~~ Initial State ~~~~~");
-      RebalanceStrategy strategy =
-          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
-      ZNRecord initialResult =
-          strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-      _currentMapping = getMapping(initialResult.getListFields());
-      logger.info(_currentMapping);
-      getRunResult(_currentMapping, initialResult.getListFields());
-      for (int i = 0; i < numIterations; i++) {
-        logger.info("~~~~ Iteration " + i + " ~~~~~");
-        ZNRecord znRecord = runOnceRandomly();
-        if (znRecord != null) {
-          final Map<String, List<String>> listResult = znRecord.getListFields();
-          final Map<String, Map<String, String>> mapResult = getMapping(listResult);
-          logger.info(mapResult);
-          logger.info(listResult);
-          getRunResult(mapResult, listResult);
-          _currentMapping = mapResult;
-        }
-      }
-    }
-
-    private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
-      final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
-      ClusterDataCache cache = new ClusterDataCache();
-      MockAccessor accessor = new MockAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-      for (String node : _liveNodes) {
-        LiveInstance liveInstance = new LiveInstance(node);
-        liveInstance.setSessionId("testSession");
-        accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
-      }
-      cache.refresh(accessor);
-      for (String partition : _partitions) {
-        List<String> preferenceList = listResult.get(partition);
-        Map<String, String> currentStateMap = _currentMapping.get(partition);
-        Set<String> disabled = Collections.emptySet();
-        Map<String, String> assignment =
-            ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
-                preferenceList, currentStateMap, disabled, true);
-        mapResult.put(partition, assignment);
-      }
-      return mapResult;
-    }
-
-    /**
-     * Output various statistics and correctness check results
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     * @param listFields
-     *          The map-list assignment generated by the rebalancer
-     */
-    public void getRunResult(final Map<String, Map<String, String>> mapFields,
-        final Map<String, List<String>> listFields) {
-      logger.info("***** Statistics *****");
-      dumpStatistics(mapFields);
-      verifyCorrectness(mapFields, listFields);
-    }
-
-    /**
-     * Output statistics about the assignment
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     */
-    public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
-      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
-      int nodeCount = _liveNodes.size();
-      logger.info("Total number of nodes: " + nodeCount);
-      logger.info("Nodes: " + _liveNodes);
-      int sumPartitions = getSum(partitionsPerNode.values());
-      logger.info("Total number of partitions: " + sumPartitions);
-      double averagePartitions = getAverage(partitionsPerNode.values());
-      logger.info("Average number of partitions per node: " + averagePartitions);
-      double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
-      logger.info("Standard deviation of partitions: " + stdevPartitions);
-
-      // Statistics about each state
-      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
-      for (String state : _states.keySet()) {
-        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
-        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
-          Map<String, Integer> stateCounts = nodeStates.getValue();
-          if (stateCounts.containsKey(state)) {
-            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
-          } else {
-            nodeStateCounts.put(nodeStates.getKey(), 0);
-          }
-        }
-        int sumStates = getSum(nodeStateCounts.values());
-        logger.info("Total number of state " + state + ": " + sumStates);
-        double averageStates = getAverage(nodeStateCounts.values());
-        logger.info("Average number of state " + state + " per node: " + averageStates);
-        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
-        logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
-      }
-    }
-
-    /**
-     * Run a set of correctness tests, reporting success or failure
-     * @param mapFields
-     *          The map-map assignment generated by the rebalancer
-     * @param listFields
-     *          The map-list assignment generated by the rebalancer
-     */
-    public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
-        final Map<String, List<String>> listFields) {
-      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
-      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
-      assert maxConstraintMet : "Max per node constraint: FAIL";
-      logger.info("Max per node constraint: PASS");
-
-      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
-      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
-      logger.info("Only live nodes have partitions constraint: PASS");
-
-      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
-      assert stateAssignmentPossible : "State replica constraint: FAIL";
-      logger.info("State replica constraint: PASS");
-
-      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
-      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
-      logger.info("Node uniqueness per partition constraint: PASS");
-    }
-
-    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
-      for (String node : partitionsPerNode.keySet()) {
-        Integer value = partitionsPerNode.get(node);
-        if (value > _maxPerNode) {
-          logger.error("ERROR: Node " + node + " has " + value
-              + " partitions despite a maximum of " + _maxPerNode);
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
-      for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
-        boolean isLive = _liveSet.contains(nodeState.getKey());
-        boolean isEmpty = nodeState.getValue() == 0;
-        if (!isLive && !isEmpty) {
-          logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
-              + nodeState.getValue() + " replicas!");
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
-      for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
-        final Map<String, String> nodeMap = partitionEntry.getValue();
-        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
-        for (String state : nodeMap.values()) {
-          if (!stateCounts.containsKey(state)) {
-            stateCounts.put(state, 1);
-          } else {
-            stateCounts.put(state, stateCounts.get(state) + 1);
-          }
-        }
-        for (String state : stateCounts.keySet()) {
-          if (state.equals(HelixDefinedState.DROPPED.toString())) {
-            continue;
-          }
-          int count = stateCounts.get(state);
-          int maximumCount = _states.get(state);
-          if (count > maximumCount) {
-            logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
-                + " has " + count + " replicas when " + maximumCount + " is allowed!");
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
-      for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
-        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
-        int numUniques = nodeSet.size();
-        int total = partitionEntry.getValue().size();
-        if (numUniques < total) {
-          logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
-              + " nodes, but only " + numUniques + " are unique!");
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private double getAverage(final Collection<Integer> values) {
-      double sum = 0.0;
-      for (Integer value : values) {
-        sum += value;
-      }
-      if (values.size() != 0) {
-        return sum / values.size();
-      } else {
-        return -1.0;
-      }
-    }
-
-    private int getSum(final Collection<Integer> values) {
-      int sum = 0;
-      for (Integer value : values) {
-        sum += value;
-      }
-      return sum;
-    }
-
-    private double getStdev(final Collection<Integer> values, double mean) {
-      double sum = 0.0;
-      for (Integer value : values) {
-        double deviation = mean - value;
-        sum += Math.pow(deviation, 2.0);
-      }
-      if (values.size() != 0) {
-        sum /= values.size();
-        return Math.pow(sum, 0.5);
-      } else {
-        return -1.0;
-      }
-    }
-
-    private Map<String, Integer> getPartitionBucketsForNode(
-        final Map<String, Map<String, String>> assignment) {
-      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
-      for (String node : _liveNodes) {
-        partitionsPerNode.put(node, 0);
-      }
-      for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
-        final Map<String, String> nodeMap = partitionEntry.getValue();
-        for (String node : nodeMap.keySet()) {
-          String state = nodeMap.get(node);
-          if (state.equals(HelixDefinedState.DROPPED.toString())) {
-            continue;
-          }
-          // add 1 for every occurrence of a node
-          if (!partitionsPerNode.containsKey(node)) {
-            partitionsPerNode.put(node, 1);
-          } else {
-            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
-          }
-        }
-      }
-      return partitionsPerNode;
-    }
-
-    private Map<String, Map<String, Integer>> getStateBucketsForNode(
-        final Map<String, Map<String, String>> assignment) {
-      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
-      for (String n : _liveNodes) {
-        result.put(n, new TreeMap<String, Integer>());
-      }
-      for (Map<String, String> nodeStateMap : assignment.values()) {
-        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
-          if (!result.containsKey(nodeState.getKey())) {
-            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
-          }
-          Map<String, Integer> stateMap = result.get(nodeState.getKey());
-          if (!stateMap.containsKey(nodeState.getValue())) {
-            stateMap.put(nodeState.getValue(), 1);
-          } else {
-            stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
-          }
-        }
-      }
-      return result;
-    }
-
-    /**
-     * Randomly choose between killing, adding, or resurrecting a single node
-     * @return (Partition -> (Node -> State)) ZNRecord
-     */
-    public ZNRecord runOnceRandomly() {
-      double choose = _random.nextDouble();
-      ZNRecord result = null;
-      if (choose < P_KILL) {
-        result = removeSingleNode(null);
-      } else if (choose < P_KILL + P_ADD) {
-        result = addSingleNode(null);
-      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
-        result = resurrectSingleNode(null);
-      }
-      return result;
-    }
-
-    /**
-     * Run rebalancer trying to add a never-live node
-     * @param node
-     *          Optional String to add
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord addSingleNode(String node) {
-      logger.info("=================== add node =================");
-      if (_nonLiveSet.size() == 0) {
-        logger.warn("Cannot add node because there are no nodes left to add.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_nonLiveSet.contains(node)) {
-        node = getRandomSetElement(_nonLiveSet);
-      }
-      logger.info("Adding " + node);
-      _liveNodes.add(node);
-      _liveSet.add(node);
-      _nonLiveSet.remove(node);
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
-          computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    /**
-     * Run rebalancer trying to remove a live node
-     * @param node
-     *          Optional String to remove
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord removeSingleNode(String node) {
-      logger.info("=================== remove node =================");
-      if (_liveSet.size() == 0) {
-        logger.warn("Cannot remove node because there are no nodes left to remove.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_liveSet.contains(node)) {
-        node = getRandomSetElement(_liveSet);
-      }
-      logger.info("Removing " + node);
-      _removedSet.add(node);
-      _liveNodes.remove(node);
-      _liveSet.remove(node);
-
-      // the rebalancer expects that the current mapping doesn't contain deleted
-      // nodes
-      for (Map<String, String> nodeMap : _currentMapping.values()) {
-        if (nodeMap.containsKey(node)) {
-          nodeMap.remove(node);
-        }
-      }
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
-          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    /**
-     * Run rebalancer trying to add back a removed node
-     * @param node
-     *          Optional String to resurrect
-     * @return ZNRecord result returned by the rebalancer
-     */
-    public ZNRecord resurrectSingleNode(String node) {
-      logger.info("=================== resurrect node =================");
-      if (_removedSet.size() == 0) {
-        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
-        return null;
-      }
-
-      // Get a random never-live node
-      if (node == null || !_removedSet.contains(node)) {
-        node = getRandomSetElement(_removedSet);
-      }
-      logger.info("Resurrecting " + node);
-      _removedSet.remove(node);
-      _liveNodes.add(node);
-      _liveSet.add(node);
-
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
-          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
-    }
-
-    private <T> T getRandomSetElement(Set<T> source) {
-      int element = _random.nextInt(source.size());
-      int i = 0;
-      for (T node : source) {
-        if (i == element) {
-          return node;
-        }
-        i++;
-      }
-      return null;
-    }
-  }
-
-  /**
-   * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
-   * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
-   * current mapping, then it should distribute states as evenly as possible.
-   */
-  @Test
-  public void testOrphansNotPreferred() {
-    final String RESOURCE_NAME = "resource";
-    final String[] PARTITIONS = {
-        "resource_0", "resource_1", "resource_2"
-    };
-    final StateModelDefinition STATE_MODEL =
-        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
-    final int REPLICA_COUNT = 2;
-    final String[] NODES = {
-        "n0", "n1", "n2"
-    };
-
-    // initial state, one node, no mapping
-    List<String> allNodes = Lists.newArrayList(NODES[0]);
-    List<String> liveNodes = Lists.newArrayList(NODES[0]);
-    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
-    for (String partition : PARTITIONS) {
-      currentMapping.put(partition, new HashMap<String, String>());
-    }
-
-    // make sure that when the first node joins, a single replica is assigned fairly
-    List<String> partitions = ImmutableList.copyOf(PARTITIONS);
-    LinkedHashMap<String, Integer> stateCount =
-        AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-    ZNRecord znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    Map<String, List<String>> preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      // make sure these are all MASTER
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
-    }
-
-    // now assign a replica to the first node in the current mapping, and add a second node
-    allNodes.add(NODES[1]);
-    liveNodes.add(NODES[1]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).put(NODES[0], "MASTER");
-    }
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
-          + partition);
-      Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
-          + partition);
-    }
-
-    // now set the current mapping to reflect this update and make sure that it distributes masters
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).put(NODES[1], "SLAVE");
-    }
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    Set<String> firstNodes = Sets.newHashSet();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-    }
-    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
-
-    // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
-    // new node is never the most preferred
-    allNodes.add(NODES[2]);
-    liveNodes.add(NODES[2]);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
-    // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
-    currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    boolean newNodeUsed = false;
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      if (preferenceList.contains(NODES[2])) {
-        newNodeUsed = true;
-        Assert.assertEquals(preferenceList.get(1), NODES[2],
-            "newly added node not at preference list tail for " + partition);
-      }
-    }
-    Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
-
-    // now remap this to take the new node into account, should go back to balancing masters, slaves
-    // evenly across all nodes
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    firstNodes.clear();
-    Set<String> secondNodes = Sets.newHashSet();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-      secondNodes.add(preferenceList.get(1));
-    }
-    Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
-    Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
-
-    // remove a node now, but use the current mapping with everything balanced just prior
-    liveNodes.remove(0);
-    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
-    // remove all references of n0 from the mapping, keep everything else in a legal state
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      Map<String, String> stateMap = currentMapping.get(partition);
-      for (String participant : stateMap.keySet()) {
-        Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
-            + partition);
-      }
-      for (String participant : preferenceList) {
-        if (!stateMap.containsKey(participant)) {
-          Assert.assertNotSame(preferenceList.get(0), participant,
-              "newly moved replica should not be master for " + partition);
-        }
-      }
-    }
-
-    // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
-    for (String partition : PARTITIONS) {
-      currentMapping.get(partition).clear();
-    }
-    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
-    currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
-    currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
-    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
-    znRecord =
-        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
-            .computePartitionAssignment(liveNodes, currentMapping, allNodes);
-    preferenceLists = znRecord.getListFields();
-    firstNodes.clear();
-    for (String partition : currentMapping.keySet()) {
-      List<String> preferenceList = preferenceLists.get(partition);
-      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
-      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
-      firstNodes.add(preferenceList.get(0));
-    }
-    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
new file mode 100644
index 0000000..5169edd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
@@ -0,0 +1,172 @@
+package org.apache.helix.controller.Strategy;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestTopology {
+  private static Logger logger = Logger.getLogger(TestTopology.class);
+
+  @Test
+  public void testCreateClusterTopology() {
+    ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+    String topology = "/Rack/Sub-Rack/Host/Instance";
+    clusterConfig.getRecord().getSimpleFields()
+        .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology);
+    clusterConfig.getRecord().getSimpleFields()
+        .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack");
+
+    List<String> allNodes = new ArrayList<String>();
+    List<String> liveNodes = new ArrayList<String>();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+    Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+    for (int i = 0; i < 100; i++) {
+      String instance = "localhost_" + i;
+      InstanceConfig config = new InstanceConfig(instance);
+      String rack_id = "rack_" + i/25;
+      String sub_rack_id = "subrack-" + i/5;
+
+      String domain =
+          String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance);
+      config.setDomain(domain);
+      config.setHostName(instance);
+      config.setPort("9000");
+      allNodes.add(instance);
+
+      int weight = 0;
+      if (i % 10 != 0) {
+        liveNodes.add(instance);
+        weight = 1000;
+        if (i % 3 == 0) {
+          // set random instance weight.
+          weight = (i+1) * 100;
+          config.setWeight(weight);
+        }
+      }
+
+      instanceConfigMap.put(instance, config);
+
+      if (!nodeToWeightMap.containsKey(rack_id)) {
+        nodeToWeightMap.put(rack_id, 0);
+      }
+      nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight);
+      if (!nodeToWeightMap.containsKey(sub_rack_id)) {
+        nodeToWeightMap.put(sub_rack_id, 0);
+      }
+      nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight);
+    }
+
+    Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+    Assert.assertTrue(topo.getEndNodeType().equals("Instance"));
+    Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack"));
+
+    List<Node> faultZones = topo.getFaultZones();
+    Assert.assertEquals(faultZones.size(), 20);
+
+    Node root = topo.getRootNode();
+
+    Assert.assertEquals(root.getChildrenCount("Rack"), 4);
+    Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20);
+    Assert.assertEquals(root.getChildrenCount("Host"), 100);
+    Assert.assertEquals(root.getChildrenCount("Instance"), 100);
+
+
+    // validate weights.
+    for (Node rack : root.getChildren()) {
+      Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName()));
+      for (Node subRack : rack.getChildren()) {
+        Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName()));
+      }
+    }
+  }
+
+  @Test
+  public void testCreateClusterTopologyWithDefaultTopology() {
+    ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+    List<String> allNodes = new ArrayList<String>();
+    List<String> liveNodes = new ArrayList<String>();
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+    Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+    for (int i = 0; i < 100; i++) {
+      String instance = "localhost_" + i;
+      InstanceConfig config = new InstanceConfig(instance);
+      String zoneId = "rack_" + i / 10;
+      config.setZoneId(zoneId);
+      config.setHostName(instance);
+      config.setPort("9000");
+      allNodes.add(instance);
+
+      int weight = 0;
+      if (i % 10 != 0) {
+        liveNodes.add(instance);
+        weight = 1000;
+        if (i % 3 == 0) {
+          // set random instance weight.
+          weight = (i + 1) * 100;
+          config.setWeight(weight);
+        }
+      }
+
+      instanceConfigMap.put(instance, config);
+
+      if (!nodeToWeightMap.containsKey(zoneId)) {
+        nodeToWeightMap.put(zoneId, 0);
+      }
+      nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight);
+    }
+
+    Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+    Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name()));
+    Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name()));
+
+    List<Node> faultZones = topo.getFaultZones();
+    Assert.assertEquals(faultZones.size(), 10);
+
+    Node root = topo.getRootNode();
+
+    Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10);
+    Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100);
+
+    // validate weights.
+    for (Node rack : root.getChildren()) {
+      Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
new file mode 100644
index 0000000..5c34792
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
@@ -0,0 +1,221 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  protected ClusterSetup _setupTool = null;
+  List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  Map<String, String> _nodeToZoneMap = new HashMap<String, String>();
+  Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+  List<String> _nodes = new ArrayList<String>();
+  List<String> _allDBs = new ArrayList<String>();
+  int _replica = 3;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+    _setupTool = new ClusterSetup(_gZkClient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      String tag = "tag-" + i % 2;
+      _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+      _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+      _nodeToZoneMap.put(storageNodeName, zone);
+      _nodeToTagMap.put(storageNodeName, tag);
+      _nodes.add(storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String [][] rebalanceStrategies() {
+    return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass)
+      throws Exception {
+    System.out.println("Test " + rebalanceStrategyName);
+    List<String> testDBs = new ArrayList<String>();
+    String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+        BuiltInStateModelDefinitions.MasterSlave.name(),
+        BuiltInStateModelDefinitions.LeaderStandby.name()
+    };
+    int i = 0;
+    for (String stateModel : testModels) {
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel,
+          RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev);
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testZoneIsolationWithInstanceTag(
+      String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception {
+    List<String> testDBs = new ArrayList<String>();
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+      _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+          BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "",
+          rebalanceStrategyClass);
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      testDBs.add(db);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+    Assert.assertTrue(result);
+
+    for (String db : testDBs) {
+      IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      validateZoneAndTagIsolation(is, ev);
+    }
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with necessary tagged instances.
+   */
+  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev) {
+    int replica = Integer.valueOf(is.getReplicas());
+    String tag = is.getInstanceGroupTag();
+
+    for (String partition : is.getPartitionSet()) {
+      Set<String> assignedZones = new HashSet<String>();
+
+      Set<String> instancesInIs = new HashSet<String>(is.getRecord().getListField(partition));
+      Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV, instancesInIs);
+      for (String instance : instancesInEV) {
+        assignedZones.add(_nodeToZoneMap.get(instance));
+        if (tag != null) {
+          InstanceConfig config =
+              _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+      Assert.assertEquals(assignedZones.size(), replica);
+    }
+  }
+
+  @Test()
+  public void testAddZone() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testAddNodes() throws Exception {
+    //TODO
+  }
+
+  @Test()
+  public void testNodeFailure() throws Exception {
+    //TODO
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /**
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    _controller.syncStop();
+    for (MockParticipantManager participant : _participants) {
+      participant.syncStop();
+    }
+    _setupTool.deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 917be17..51dd19d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -31,6 +31,7 @@ import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateMod
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.log4j.Logger;
 
@@ -73,14 +74,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   public void run() {
     try {
       StateMachineEngine stateMach = getStateMachineEngine();
-      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(),
+          _msModelFactory);
 
       DummyLeaderStandbyStateModelFactory lsModelFactory =
           new DummyLeaderStandbyStateModelFactory(10);
       DummyOnlineOfflineStateModelFactory ofModelFactory =
           new DummyOnlineOfflineStateModelFactory(10);
-      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
-      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(),
+          lsModelFactory);
+      stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
+          ofModelFactory);
 
       MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
       stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);


[3/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy

Posted by lx...@apache.org.
[HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm.
Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7147ec87
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7147ec87
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7147ec87

Branch: refs/heads/helix-0.6.x
Commit: 7147ec874e912f27905c299fefe0d09ca31ebd42
Parents: ea0fbbb
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jun 16 12:06:34 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixAdmin.java  |  30 +
 .../java/org/apache/helix/HelixConstants.java   |   4 +
 .../main/java/org/apache/helix/PropertyKey.java |   3 +-
 .../controller/rebalancer/AutoRebalancer.java   |  15 +-
 .../helix/controller/rebalancer/Rebalancer.java |   1 -
 .../strategy/AutoRebalanceStrategy.java         | 754 ++++++++++++++++++
 .../strategy/CrushRebalanceStrategy.java        | 174 +++++
 .../rebalancer/strategy/RebalanceStrategy.java  |  57 ++
 .../crushMapping/CRUSHPlacementAlgorithm.java   | 316 ++++++++
 .../strategy/crushMapping/JenkinsHash.java      | 140 ++++
 .../controller/rebalancer/topology/Node.java    | 208 +++++
 .../rebalancer/topology/Topology.java           | 295 +++++++
 .../controller/stages/ClusterDataCache.java     |  14 +-
 .../strategy/AutoRebalanceStrategy.java         | 753 ------------------
 .../controller/strategy/RebalanceStrategy.java  |  52 --
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  43 +-
 .../org/apache/helix/model/ClusterConfig.java   |  92 +++
 .../java/org/apache/helix/model/IdealState.java |   2 +-
 .../org/apache/helix/model/InstanceConfig.java  |  50 +-
 .../task/GenericTaskAssignmentCalculator.java   |   7 +-
 .../org/apache/helix/tools/ClusterSetup.java    |   6 +
 .../Strategy/TestAutoRebalanceStrategy.java     | 766 +++++++++++++++++++
 .../strategy/TestAutoRebalanceStrategy.java     | 765 ------------------
 .../helix/controller/strategy/TestTopology.java | 172 +++++
 .../integration/TestCrushAutoRebalance.java     | 221 ++++++
 .../manager/MockParticipantManager.java         |  10 +-
 26 files changed, 3355 insertions(+), 1595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index fbfab26..aeacd4b 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -114,6 +114,18 @@ public interface HelixAdmin {
       String stateModelRef, String rebalancerMode);
 
   /**
+   * Add a resource to a cluster
+   * @param clusterName
+   * @param resourceName
+   * @param numPartitions
+   * @param stateModelRef
+   * @param rebalancerMode
+   * @param rebalanceStrategy
+   */
+  void addResource(String clusterName, String resourceName, int numPartitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy);
+
+  /**
    * Add a resource to a cluster, using a bucket size > 1
    * @param clusterName
    * @param resourceName
@@ -138,6 +150,22 @@ public interface HelixAdmin {
   void addResource(String clusterName, String resourceName, int numPartitions,
       String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance);
 
+
+  /**
+   * Add a resource to a cluster, using a bucket size > 1
+   * @param clusterName
+   * @param resourceName
+   * @param numPartitions
+   * @param stateModelRef
+   * @param rebalancerMode
+   * @param rebalanceStrategy
+   * @param bucketSize
+   * @param maxPartitionsPerInstance
+   */
+  void addResource(String clusterName, String resourceName, int numPartitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+      int maxPartitionsPerInstance);
+
   /**
    * Add an instance to a cluster
    * @param clusterName
@@ -411,6 +439,8 @@ public interface HelixAdmin {
    */
   void removeInstanceTag(String clusterName, String instanceName, String tag);
 
+  void setInstanceZoneId(String clusterName, String instanceName, String zoneId);
+
   /**
    * Release resources
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 5318fa9..6de0ff1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -43,6 +43,10 @@ public interface HelixConstants {
     ANY_LIVEINSTANCE
   }
 
+  /**
+   * Replaced by ClusterConfig.ClusterConfigProperty.
+   */
+  @Deprecated
   enum ClusterConfigType {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
     DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 33355f1..0125902 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
 
 import java.util.Arrays;
 
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Error;
@@ -186,7 +187,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey clusterConfig() {
-      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class,
           _clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 6682426..ba237b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -35,8 +35,8 @@ import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
@@ -79,8 +79,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
     Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
     String replicas = currentIdealState.getReplicas();
 
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
-    stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+    LinkedHashMap<String, Integer> stateCountMap =
+        stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
     List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     allNodes.removeAll(clusterData.getDisabledInstances());
@@ -129,7 +129,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
     String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
-    if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+    if (rebalanceStrategyName == null || rebalanceStrategyName
+        .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
       _rebalanceStrategy =
           new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
     } else {
@@ -152,8 +153,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
       }
     }
 
-    ZNRecord newMapping =
-        _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+    ZNRecord newMapping = _rebalanceStrategy
+        .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index f5a4ae8..6935378 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -46,5 +46,4 @@ public interface Rebalancer {
    */
   IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
       final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData);
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
new file mode 100644
index 0000000..868d207
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -0,0 +1,754 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.log4j.Logger;
+
+public class AutoRebalanceStrategy implements RebalanceStrategy {
+  private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
+  private final ReplicaPlacementScheme _placementScheme;
+
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+  private int _maximumPerNode;
+
+  private Map<String, Node> _nodeMap;
+  private List<Node> _liveNodesList;
+  private Map<Integer, String> _stateMap;
+
+  private Map<Replica, Node> _preferredAssignment;
+  private Map<Replica, Node> _existingPreferredAssignment;
+  private Map<Replica, Node> _existingNonPreferredAssignment;
+  private Set<Replica> _orphaned;
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    init(resourceName, partitions, states, maximumPerNode);
+    _placementScheme = new DefaultPlacementScheme();
+  }
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states) {
+    this(resourceName, partitions, states, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    _maximumPerNode = maximumPerNode;
+  }
+
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, ClusterDataCache clusterData) {
+    int numReplicas = countStateReplicas();
+    ZNRecord znRecord = new ZNRecord(_resourceName);
+    if (liveNodes.size() == 0) {
+      return znRecord;
+    }
+    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
+    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+    _nodeMap = new HashMap<String, Node>();
+    _liveNodesList = new ArrayList<Node>();
+
+    for (String id : allNodes) {
+      Node node = new Node(id);
+      node.capacity = 0;
+      node.hasCeilingCapacity = false;
+      _nodeMap.put(id, node);
+    }
+    for (int i = 0; i < liveNodes.size(); i++) {
+      boolean usingCeiling = false;
+      int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
+      if (distRemainder > 0 && targetSize < _maximumPerNode) {
+        targetSize += 1;
+        distRemainder = distRemainder - 1;
+        usingCeiling = true;
+      }
+      Node node = _nodeMap.get(liveNodes.get(i));
+      node.isAlive = true;
+      node.capacity = targetSize;
+      node.hasCeilingCapacity = usingCeiling;
+      _liveNodesList.add(node);
+    }
+
+    // compute states for all replica ids
+    _stateMap = generateStateMap();
+
+    // compute the preferred mapping if all nodes were up
+    _preferredAssignment = computePreferredPlacement(allNodes);
+
+    // logger.info("preferred mapping:"+ preferredAssignment);
+    // from current mapping derive the ones in preferred location
+    // this will update the nodes with their current fill status
+    _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
+
+    // from current mapping derive the ones not in preferred location
+    _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
+
+    // compute orphaned replicas that are not assigned to any node
+    _orphaned = computeOrphaned();
+    if (logger.isInfoEnabled()) {
+      logger.info("orphan = " + _orphaned);
+    }
+
+    moveNonPreferredReplicasToPreferred();
+
+    assignOrphans();
+
+    moveExcessReplicas();
+
+    prepareResult(znRecord);
+    return znRecord;
+  }
+
+  /**
+   * Move replicas assigned to non-preferred nodes if their current node is at capacity
+   * and its preferred node is under capacity.
+   */
+  private void moveNonPreferredReplicasToPreferred() {
+    // iterate through non preferred and see if we can move them to the
+    // preferred location if the donor has more than it should and stealer has
+    // enough capacity
+    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Replica, Node> entry = iterator.next();
+      Replica replica = entry.getKey();
+      Node donor = entry.getValue();
+      Node receiver = _preferredAssignment.get(replica);
+      if (donor.capacity < donor.currentlyAssigned
+          && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
+        donor.currentlyAssigned = donor.currentlyAssigned - 1;
+        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+        donor.nonPreferred.remove(replica);
+        receiver.preferred.add(replica);
+        donor.newReplicas.remove(replica);
+        receiver.newReplicas.add(replica);
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
+   */
+  private void assignOrphans() {
+    // now iterate over nodes and remaining orphaned partitions and assign
+    // partitions randomly
+    // Better to iterate over orphaned partitions first
+    Iterator<Replica> it = _orphaned.iterator();
+    while (it.hasNext()) {
+      Replica replica = it.next();
+      boolean added = false;
+      int startIndex = computeRandomStartIndex(replica);
+      for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
+          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+          receiver.nonPreferred.add(replica);
+          receiver.newReplicas.add(replica);
+          added = true;
+          break;
+        }
+      }
+      if (!added) {
+        // try adding the replica by making room for it
+        added = assignOrphanByMakingRoom(replica);
+      }
+      if (added) {
+        it.remove();
+      }
+    }
+    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+      logger.info("could not assign nodes to partitions: " + _orphaned);
+    }
+  }
+
+  /**
+   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
+   * @param replica The replica to assign
+   * @return true if the assignment succeeded, false otherwise
+   */
+  private boolean assignOrphanByMakingRoom(Replica replica) {
+    Node capacityDonor = null;
+    Node capacityAcceptor = null;
+    int startIndex = computeRandomStartIndex(replica);
+    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+      Node current = _liveNodesList.get(index % _liveNodesList.size());
+      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
+          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
+        // this node has space but cannot accept the node
+        capacityDonor = current;
+      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
+          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
+        // this node would be able to accept the replica if it has ceiling capacity
+        capacityAcceptor = current;
+      }
+      if (capacityDonor != null && capacityAcceptor != null) {
+        break;
+      }
+    }
+    if (capacityDonor != null && capacityAcceptor != null) {
+      // transfer ceiling capacity and add the node
+      capacityAcceptor.steal(capacityDonor, replica);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Move replicas from too-full nodes to nodes that can accept the replicas
+   */
+  private void moveExcessReplicas() {
+    // iterate over nodes and move extra load
+    Iterator<Replica> it;
+    for (Node donor : _liveNodesList) {
+      if (donor.capacity < donor.currentlyAssigned) {
+        Collections.sort(donor.nonPreferred);
+        it = donor.nonPreferred.iterator();
+        while (it.hasNext()) {
+          Replica replica = it.next();
+          int startIndex = computeRandomStartIndex(replica);
+          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+            if (receiver.canAdd(replica)) {
+              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+              receiver.nonPreferred.add(replica);
+              donor.currentlyAssigned = donor.currentlyAssigned - 1;
+              it.remove();
+              break;
+            }
+          }
+          if (donor.capacity >= donor.currentlyAssigned) {
+            break;
+          }
+        }
+        if (donor.capacity < donor.currentlyAssigned) {
+          logger.warn("Could not take partitions out of node:" + donor.id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Update a ZNRecord with the results of the rebalancing.
+   * @param znRecord
+   */
+  private void prepareResult(ZNRecord znRecord) {
+    // The map fields are keyed on partition name to a pair of node and state, i.e. it
+    // indicates that the partition with given state is served by that node
+    //
+    // The list fields are also keyed on partition and list all the nodes serving that partition.
+    // This is useful to verify that there is no node serving multiple replicas of the same
+    // partition.
+    Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
+    for (String partition : _partitions) {
+      znRecord.setMapField(partition, new TreeMap<String, String>());
+      znRecord.setListField(partition, new ArrayList<String>());
+      newPreferences.put(partition, new ArrayList<String>());
+    }
+
+    // for preference lists, the rough priority that we want is:
+    // [existing preferred, existing non-preferred, non-existing preferred, non-existing
+    // non-preferred]
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.preferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.nonPreferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
+
+    // generate preference maps based on the preference lists
+    for (String partition : _partitions) {
+      List<String> preferenceList = znRecord.getListField(partition);
+      int i = 0;
+      for (String participant : preferenceList) {
+        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+        i++;
+      }
+    }
+  }
+
+  /**
+   * Adjust preference lists to reduce the number of same replicas on an instance. This will
+   * separately normalize two sets of preference lists, and then append the results of the second
+   * set to those of the first. This basically ensures that existing replicas are automatically
+   * preferred.
+   * @param preferenceLists map of (partition --> list of nodes)
+   * @param newPreferences map containing node preferences not consistent with the current
+   *          assignment
+   */
+  private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
+      Map<String, List<String>> newPreferences) {
+
+    Map<String, Map<String, Integer>> nodeReplicaCounts =
+        new HashMap<String, Map<String, Integer>>();
+    for (String partition : preferenceLists.keySet()) {
+      normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
+    }
+    for (String partition : newPreferences.keySet()) {
+      normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
+      preferenceLists.get(partition).addAll(newPreferences.get(partition));
+    }
+  }
+
+  /**
+   * Adjust a single preference list for replica assignment imbalance
+   * @param preferenceList list of node names
+   * @param nodeReplicaCounts map of (node --> state --> count)
+   */
+  private void normalizePreferenceList(List<String> preferenceList,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    List<String> newPreferenceList = new ArrayList<String>();
+    int replicas = Math.min(countStateReplicas(), preferenceList.size());
+
+    // make this a LinkedHashSet to preserve iteration order
+    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
+    for (int i = 0; i < replicas; i++) {
+      String state = _stateMap.get(i);
+      String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
+      newPreferenceList.add(node);
+      notAssigned.remove(node);
+      Map<String, Integer> counts = nodeReplicaCounts.get(node);
+      counts.put(state, counts.get(state) + 1);
+    }
+    preferenceList.clear();
+    preferenceList.addAll(newPreferenceList);
+  }
+
+  /**
+   * Get the node which hosts the fewest of a given replica
+   * @param state the state
+   * @param nodes nodes to check
+   * @param nodeReplicaCounts current assignment of replicas
+   * @return the node most willing to accept the replica
+   */
+  private String getMinimumNodeForReplica(String state, Set<String> nodes,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    String minimalNode = null;
+    int minimalCount = Integer.MAX_VALUE;
+    for (String node : nodes) {
+      int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
+      if (count < minimalCount) {
+        minimalCount = count;
+        minimalNode = node;
+      }
+    }
+    return minimalNode;
+  }
+
+  /**
+   * Safe check for the number of replicas of a given id assiged to a node
+   * @param state the state to assign
+   * @param node the node to check
+   * @param nodeReplicaCounts a map of node to replica id and counts
+   * @return the number of currently assigned replicas of the given id
+   */
+  private int getReplicaCountForNode(String state, String node,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    if (!nodeReplicaCounts.containsKey(node)) {
+      Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
+      replicaCounts.put(state, 0);
+      nodeReplicaCounts.put(node, replicaCounts);
+      return 0;
+    }
+    Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
+    if (!replicaCounts.containsKey(state)) {
+      replicaCounts.put(state, 0);
+      return 0;
+    }
+    return replicaCounts.get(state);
+  }
+
+  /**
+   * Compute the subset of the current mapping where replicas are not mapped according to their
+   * preferred assignment.
+   * @param currentMapping Current mapping of replicas to nodes
+   * @return The current assignments that do not conform to the preferred assignment
+   */
+  private Map<Replica, Node> computeExistingNonPreferredPlacement(
+      Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        boolean skip = false;
+        for (Replica replica : node.preferred) {
+          if (replica.partition.equals(partition)) {
+            skip = true;
+            break;
+          }
+        }
+        if (skip) {
+          continue;
+        }
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (!_preferredAssignment.containsKey(replica)) {
+
+            logger.info("partitions: " + _partitions);
+            logger.info("currentMapping.keySet: " + currentMapping.keySet());
+            throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
+          }
+
+          if (_preferredAssignment.get(replica).id != node.id
+              && !_existingPreferredAssignment.containsKey(replica)
+              && !existingNonPreferredAssignment.containsKey(replica)) {
+            existingNonPreferredAssignment.put(replica, node);
+            node.nonPreferred.add(replica);
+
+            break;
+          }
+        }
+      }
+    }
+    return existingNonPreferredAssignment;
+  }
+
+  /**
+   * Get a live node index to try first for a replica so that each possible start index is
+   * roughly uniformly assigned.
+   * @param replica The replica to assign
+   * @return The starting node index to try
+   */
+  private int computeRandomStartIndex(final Replica replica) {
+    return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+  }
+
+  /**
+   * Get a set of replicas not currently assigned to any node
+   * @return Unassigned replicas
+   */
+  private Set<Replica> computeOrphaned() {
+    Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
+    for (Replica r : _existingPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+    for (Replica r : _existingNonPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+
+    return orphanedPartitions;
+  }
+
+  /**
+   * Determine the replicas already assigned to their preferred nodes
+   * @param currentMapping Current assignment of replicas to nodes
+   * @return Assignments that conform to the preferred placement
+   */
+  private Map<Replica, Node> computeExistingPreferredPlacement(
+      final Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        node.currentlyAssigned = node.currentlyAssigned + 1;
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (_preferredAssignment.containsKey(replica)
+              && !existingPreferredAssignment.containsKey(replica)
+              && _preferredAssignment.get(replica).id == node.id) {
+            existingPreferredAssignment.put(replica, node);
+            node.preferred.add(replica);
+            break;
+          }
+        }
+      }
+    }
+
+    return existingPreferredAssignment;
+  }
+
+  /**
+   * Given a predefined set of all possible nodes, compute an assignment of replicas to
+   * nodes that evenly assigns all replicas to nodes.
+   * @param allNodes Identifiers to all nodes, live and non-live
+   * @return Preferred assignment of replicas
+   */
+  private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
+    Map<Replica, Node> preferredMapping;
+    preferredMapping = new HashMap<Replica, Node>();
+    int partitionId = 0;
+    int numReplicas = countStateReplicas();
+    int count = countStateReplicas();
+    for (String partition : _partitions) {
+      for (int replicaId = 0; replicaId < count; replicaId++) {
+        Replica replica = new Replica(partition, replicaId);
+        String nodeName =
+            _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
+                allNodes);
+        preferredMapping.put(replica, _nodeMap.get(nodeName));
+      }
+      partitionId = partitionId + 1;
+    }
+    return preferredMapping;
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas() {
+    int total = 0;
+    for (Integer count : _states.values()) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Compute a map of replica ids to state names
+   * @return Map: replica id -> state name
+   */
+  private Map<Integer, String> generateStateMap() {
+    int replicaId = 0;
+    Map<Integer, String> stateMap = new HashMap<Integer, String>();
+    for (String state : _states.keySet()) {
+      Integer count = _states.get(state);
+      for (int i = 0; i < count; i++) {
+        stateMap.put(replicaId, state);
+        replicaId++;
+      }
+    }
+    return stateMap;
+  }
+
+  /**
+   * A Node is an entity that can serve replicas. It has a capacity and knowledge
+   * of replicas assigned to it, so it can decide if it can receive additional replicas.
+   */
+  class Node {
+    public int currentlyAssigned;
+    public int capacity;
+    public boolean hasCeilingCapacity;
+    private final String id;
+    boolean isAlive;
+    private final List<Replica> preferred;
+    private final List<Replica> nonPreferred;
+    private final Set<Replica> newReplicas;
+
+    public Node(String id) {
+      preferred = new ArrayList<Replica>();
+      nonPreferred = new ArrayList<Replica>();
+      newReplicas = new TreeSet<Replica>();
+      currentlyAssigned = 0;
+      isAlive = false;
+      this.id = id;
+    }
+
+    /**
+     * Check if this replica can be legally added to this node
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAdd(Replica replica) {
+      if (currentlyAssigned >= capacity) {
+        return false;
+      }
+      return canAddIfCapacity(replica);
+    }
+
+    /**
+     * Check if this replica can be legally added to this node, provided that it has enough
+     * capacity.
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAddIfCapacity(Replica replica) {
+      if (!isAlive) {
+        return false;
+      }
+      for (Replica r : preferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      for (Replica r : nonPreferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Receive a replica by stealing capacity from another Node
+     * @param donor The node that has excess capacity
+     * @param replica The replica to receive
+     */
+    public void steal(Node donor, Replica replica) {
+      donor.hasCeilingCapacity = false;
+      donor.capacity--;
+      hasCeilingCapacity = true;
+      capacity++;
+      currentlyAssigned++;
+      nonPreferred.add(replica);
+      newReplicas.add(replica);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
+          .append("\nnonpreferred:").append(nonPreferred.size());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * A Replica is a combination of a partition of the resource, the state the replica is in
+   * and an identifier signifying a specific replica of a given partition and state.
+   */
+  class Replica implements Comparable<Replica> {
+    private String partition;
+    private int replicaId; // this is a partition-relative id
+    private String format;
+
+    public Replica(String partition, int replicaId) {
+      this.partition = partition;
+      this.replicaId = replicaId;
+      this.format = this.partition + "|" + this.replicaId;
+    }
+
+    @Override
+    public String toString() {
+      return format;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof Replica) {
+        return this.format.equals(((Replica) that).format);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.format.hashCode();
+    }
+
+    @Override
+    public int compareTo(Replica that) {
+      if (that instanceof Replica) {
+        return this.format.compareTo(that.format);
+      }
+      return -1;
+    }
+  }
+
+  /**
+   * Interface for providing a custom approach to computing a replica's affinity to a node.
+   */
+  public interface ReplicaPlacementScheme {
+    /**
+     * Initialize global state
+     * @param manager The instance to which this placement is associated
+     */
+    public void init(final HelixManager manager);
+
+    /**
+     * Given properties of this replica, determine the node it would prefer to be served by
+     * @param partitionId The current partition
+     * @param replicaId The current replica with respect to the current partition
+     * @param numPartitions The total number of partitions
+     * @param numReplicas The total number of replicas per partition
+     * @param nodeNames A list of identifiers of all nodes, live and non-live
+     * @return The name of the node that would prefer to serve this replica
+     */
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames);
+  }
+
+  /**
+   * Compute preferred placements based on a default strategy that assigns replicas to nodes as
+   * evenly as possible while avoiding placing two replicas of the same partition on any node.
+   */
+  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
+    @Override
+    public void init(final HelixManager manager) {
+      // do nothing since this is independent of the manager
+    }
+
+    @Override
+    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+        final List<String> nodeNames) {
+      int index;
+      if (nodeNames.size() > numPartitions) {
+        // assign replicas in partition order in case there are more nodes than partitions
+        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+      } else if (nodeNames.size() == numPartitions) {
+        // need a replica offset in case the sizes of these sets are the same
+        index =
+            ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
+                % nodeNames.size();
+      } else {
+        // in all other cases, assigning a replica at a time for each partition is reasonable
+        index = (partitionId + replicaId) % nodeNames.size();
+      }
+      return nodeNames.get(index);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
new file mode 100644
index 0000000..a8fe107
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -0,0 +1,174 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * CRUSH-based partition mapping strategy.
+ */
+public class CrushRebalanceStrategy implements RebalanceStrategy {
+  private String _resourceName;
+  private List<String> _partitions;
+  private Topology _clusterTopo;
+  private int _replicas;
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _replicas = countStateReplicas(states);
+  }
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for the given resource.
+   *
+   * @param allNodes       All instances
+   * @param liveNodes      List of live instances
+   * @param currentMapping current replica mapping
+   * @param clusterData    cluster data
+   * @return
+   * @throws HelixException if a map can not be found
+   */
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+      ClusterDataCache clusterData) throws HelixException {
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    _clusterTopo =
+        new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+    Node topNode = _clusterTopo.getRootNode();
+
+    Map<String, List<String>> newPreferences = new HashMap<String, List<String>>();
+    for (int i = 0; i < _partitions.size(); i++) {
+      String partitionName = _partitions.get(i);
+      long data = partitionName.hashCode();
+
+      // apply the placement rules
+      List<Node> selected = select(topNode, data, _replicas);
+
+      List<String> nodeList = new ArrayList<String>();
+      for (int j = 0; j < selected.size(); j++) {
+        nodeList.add(selected.get(j).getName());
+      }
+
+      newPreferences.put(partitionName, nodeList);
+    }
+
+    ZNRecord result = new ZNRecord(_resourceName);
+    result.setListFields(newPreferences);
+
+    return result;
+  }
+
+  /**
+   * Number of retries for finding an appropriate instance for a replica.
+   */
+  private static final int MAX_RETRY = 100;
+  private final JenkinsHash hashFun = new JenkinsHash();
+  private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm();
+
+  /**
+   * Enforce isolation on the specified fault zone.
+   * The caller will either get the expected number of selected nodes as a result, or an exception will be thrown.
+   */
+  private List<Node> select(Node topNode, long data, int rf)
+      throws HelixException {
+    List<Node> nodes = new ArrayList<Node>(rf);
+    Set<Node> selectedZones = new HashSet<Node>();
+    long input = data;
+    int count = rf;
+    int tries = 0;
+    while (nodes.size() < rf) {
+      doSelect(topNode, input, count, nodes, selectedZones);
+      count = rf - nodes.size();
+      if (count > 0) {
+        input = hashFun.hash(input); // create a different hash value for retrying
+        tries++;
+        if (tries >= MAX_RETRY) {
+          throw new HelixException(
+              String.format("could not find all mappings after %d tries", tries));
+        }
+      }
+    }
+    return nodes;
+  }
+
+  private void doSelect(Node topNode, long input, int rf, List<Node> selectedNodes,
+      Set<Node> selectedZones) {
+    String zoneType = _clusterTopo.getFaultZoneType();
+    String endNodeType = _clusterTopo.getEndNodeType();
+
+    if (!zoneType.equals(endNodeType)) {
+      // pick fault zones first
+      List<Node> zones = placementAlgorithm
+          .select(topNode, input, rf, zoneType, nodeAlreadySelected(selectedZones));
+      // add the racks to the selected racks
+      selectedZones.addAll(zones);
+      // pick one end node from each fault zone.
+      for (Node zone : zones) {
+        List<Node> endNode = placementAlgorithm.select(zone, input, 1, endNodeType);
+        selectedNodes.addAll(endNode);
+      }
+    } else {
+      // pick end node directly
+      List<Node> nodes = placementAlgorithm.select(topNode, input, rf, endNodeType,
+          nodeAlreadySelected(new HashSet(selectedNodes)));
+      selectedNodes.addAll(nodes);
+    }
+  }
+
+  /**
+   * Use the predicate to reject already selected zones or nodes.
+   */
+  private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) {
+    return Predicates.not(Predicates.in(selectedNodes));
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas(Map<String, Integer> stateCountMap) {
+    int total = 0;
+    for (Integer count : stateCountMap.values()) {
+      total += count;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..a3c7e94
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
@@ -0,0 +1,57 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+  String DEFAULT_REBALANCE_STRATEGY = "DEFAULT";
+
+  /**
+   * Perform the necessary initialization for the rebalance strategy object.
+   *
+   * @param resourceName
+   * @param partitions
+   * @param states
+   * @param maximumPerNode
+   */
+  void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for the given resource.
+   *
+   * @param liveNodes
+   * @param currentMapping
+   * @param allNodes
+   * @return
+   */
+  ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+      ClusterDataCache clusterData);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
new file mode 100644
index 0000000..870656c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -0,0 +1,316 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The transcription of the CRUSH placement algorithm from the Weil paper. This is a fairly simple
+ * adaptation, but a couple of important changes have been made to work with the crunch mapping.
+ */
+public class CRUSHPlacementAlgorithm {
+  /**
+   * In case the select() method fails to select after looping back to the origin of selection after
+   * so many tries, we stop the search. This constant denotes the maximum number of retries after
+   * looping back to the origin. It is expected that in most cases the selection will either succeed
+   * with a small number of tries, or it will never succeed. So a reasonably large number to
+   * distinguish these two cases should be sufficient.
+   */
+  private static final int MAX_LOOPBACK_COUNT = 50;
+  private static final Logger logger = LoggerFactory.getLogger(CRUSHPlacementAlgorithm.class);
+
+  private final boolean keepOffset;
+  private final Map<Long,Integer> roundOffset;
+
+  /**
+   * Creates the crush placement object.
+   */
+  public CRUSHPlacementAlgorithm() {
+    this(false);
+  }
+
+  /**
+   * Creates the crush placement algorithm with the indication whether the round offset should be
+   * kept for the duration of this object for successive selection of the same input.
+   */
+  public CRUSHPlacementAlgorithm(boolean keepOffset) {
+    this.keepOffset = keepOffset;
+    roundOffset = keepOffset ? new HashMap<Long,Integer>() : null;
+  }
+
+  /**
+   * Returns a list of (count) nodes of the desired type. If the count is more than the number of
+   * available nodes, an exception is thrown. Note that it is possible for this method to return a
+   * list whose size is smaller than the requested size (count) if it is unable to select all the
+   * nodes for any reason. Callers should check the size of the returned list and take action if
+   * needed.
+   */
+  public List<Node> select(Node parent, long input, int count, String type) {
+    return select(parent, input, count, type, Predicates.<Node>alwaysTrue());
+  }
+
+  public List<Node> select(Node parent, long input, int count, String type,
+      Predicate<Node> nodePredicate) {
+    int childCount = parent.getChildrenCount(type);
+    if (childCount < count) {
+      throw new IllegalArgumentException(count + " nodes of type " + type +
+          " were requested but the tree has only " + childCount + " nodes!");
+    }
+
+    List<Node> selected = new ArrayList<Node>(count);
+    // use the index stored in the map
+    Integer offset;
+    if (keepOffset) {
+      offset = roundOffset.get(input);
+      if (offset == null) {
+        offset = 0;
+        roundOffset.put(input, offset);
+      }
+    } else {
+      offset = 0;
+    }
+
+    int rPrime = 0;
+    for (int r = 1; r <= count; r++) {
+      int failure = 0;
+      // number of times we had to loop back to the origin
+      int loopbackCount = 0;
+      boolean escape = false;
+      boolean retryOrigin;
+      Node out = null;
+      do {
+        retryOrigin = false; // initialize at the outset
+        Node in = parent;
+        Set<Node> rejected = new HashSet<Node>();
+        boolean retryNode;
+        do {
+          retryNode = false; // initialize at the outset
+          rPrime = r + offset + failure;
+          logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime});
+          Selector selector = new Selector(in);
+          out = selector.select(input, rPrime);
+          if (!out.getType().equalsIgnoreCase(type)) {
+            logger.trace("selected output {} for data {} didn't match the type {}: walking down " +
+                "the hierarchy...", new Object[] {out, input, type});
+            in = out; // walk down the hierarchy
+            retryNode = true; // stay within the node and walk down the tree
+          } else { // type matches
+            boolean predicateRejected = !nodePredicate.apply(out);
+            if (selected.contains(out) || predicateRejected) {
+              if (predicateRejected) {
+                logger.trace("{} was rejected by the node predicate for data {}: rejecting and " +
+                    "increasing rPrime", out, input);
+                rejected.add(out);
+              } else { // already selected
+                logger.trace("{} was already selected for data {}: rejecting and increasing rPrime",
+                    out, input);
+              }
+
+              // we need to see if we have selected all possible nodes from this parent, in which
+              // case we should loop back to the origin and start over
+              if (allChildNodesEliminated(in, selected, rejected)) {
+                logger.trace("all child nodes of {} have been eliminated", in);
+                if (loopbackCount == MAX_LOOPBACK_COUNT) {
+                  // we looped back the maximum times we specified; we give up search, and exit
+                  escape = true;
+                  break;
+                }
+                loopbackCount++;
+                logger.trace("looping back to the original parent node ({})", parent);
+                retryOrigin = true;
+              } else {
+                retryNode = true; // go back and reselect on the same parent
+              }
+              failure++;
+            } else if (nodeIsOut(out)) {
+              logger.trace("{} is marked as out (failed or over the maximum assignment) for data " +
+                  "{}! looping back to the original parent node", out, input);
+              failure++;
+              if (loopbackCount == MAX_LOOPBACK_COUNT) {
+                // we looped back the maximum times we specified; we give up search, and exit
+                escape = true;
+                break;
+              }
+              loopbackCount++;
+              // re-selection on the same parent is detrimental in case of node failure: loop back
+              // to the origin
+              retryOrigin = true;
+            } else {
+              // we got a successful selection
+              break;
+            }
+          }
+        } while (retryNode);
+      } while (retryOrigin);
+
+      if (escape) {
+        // cannot find a node under this parent; return a smaller set than was intended
+        logger.debug("we could not select a node for data {} under parent {}; a smaller data set " +
+            "than is requested will be returned", input, parent);
+        continue;
+      }
+
+      logger.trace("{} was selected for data {}", out, input);
+      selected.add(out);
+    }
+    if (keepOffset) {
+      roundOffset.put(input, rPrime);
+    }
+    return selected;
+  }
+
+
+  private boolean nodeIsOut(Node node) {
+    if (node.isLeaf() && node.isFailed()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Examines the immediate child nodes of the given parent node, and sees if all of the children
+   * that can be selected (i.e. not failed) are already selected. This is used to determine whether
+   * this parent node should no longer be used in the selection.
+   */
+  private boolean allChildNodesEliminated(Node parent, List<Node> selected, Set<Node> rejected) {
+    List<Node> children = parent.getChildren();
+    if (children != null) {
+      for (Node child: children) {
+        if (!nodeIsOut(child) && !selected.contains(child) && !rejected.contains(child)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Selection algorithm based on the "straw" bucket type as described in the CRUSH algorithm.
+   */
+  private class Selector {
+    private final Map<Node,Long> straws = new HashMap<Node,Long>();
+    private final JenkinsHash hashFunction;
+
+    public Selector(Node node) {
+      if (!node.isLeaf()) {
+        // create a map from the nodes to their values
+        List<Node> sortedNodes = sortNodes(node.getChildren()); // do a reverse sort by weight
+
+        int numLeft = sortedNodes.size();
+        float straw = 1.0f;
+        float wbelow = 0.0f;
+        float lastw = 0.0f;
+        int i = 0;
+        final int length = sortedNodes.size();
+        while (i < length) {
+          Node current = sortedNodes.get(i);
+          if (current.getWeight() == 0) {
+            straws.put(current, 0L);
+            i++;
+            continue;
+          }
+          straws.put(current, (long)(straw*0x10000));
+          i++;
+          if (i == length) {
+            break;
+          }
+
+          current = sortedNodes.get(i);
+          Node previous = sortedNodes.get(i-1);
+          if (current.getWeight() == previous.getWeight()) {
+            continue;
+          }
+          wbelow += (float)(previous.getWeight() - lastw)*numLeft;
+          for (int j = i; j < length; j++) {
+            if (sortedNodes.get(j).getWeight() == current.getWeight()) {
+              numLeft--;
+            } else {
+              break;
+            }
+          }
+          float wnext = (float)(numLeft * (current.getWeight() - previous.getWeight()));
+          float pbelow = wbelow/(wbelow + wnext);
+          straw *= Math.pow(1.0/pbelow, 1.0/numLeft);
+          lastw = previous.getWeight();
+        }
+      }
+      hashFunction = new JenkinsHash();
+    }
+
+    /**
+     * Returns a new list that's sorted in the reverse order of the weight.
+     */
+    private List<Node> sortNodes(List<Node> nodes) {
+      List<Node> ret = new ArrayList<Node>(nodes);
+      sortNodesInPlace(ret);
+      return ret;
+    }
+
+    /**
+     * Sorts the list in place in the reverse order of the weight.
+     */
+    private void sortNodesInPlace(List<Node> nodes) {
+      Collections.sort(nodes, new Comparator<Node>() {
+        public int compare(Node n1, Node n2) {
+          if (n2.getWeight() == n1.getWeight()) {
+            return 0;
+          }
+          return (n2.getWeight() - n1.getWeight() > 0) ? 1 : -1;
+          // sort by weight only in the reverse order
+        }
+      });
+    }
+
+    public Node select(long input, long round) {
+      Node selected = null;
+      long hiScore = -1;
+      for (Map.Entry<Node,Long> e: straws.entrySet()) {
+        Node child = e.getKey();
+        long straw = e.getValue();
+        long score = weightedScore(child, straw, input, round);
+        if (score > hiScore) {
+          selected = child;
+          hiScore = score;
+        }
+      }
+      if (selected == null) {
+        throw new IllegalStateException();
+      }
+      return selected;
+    }
+
+    private long weightedScore(Node child, long straw, long input, long round) {
+      long hash = hashFunction.hash(input, child.getId(), round);
+      hash = hash&0xffff;
+      long weightedScore = hash*straw;
+      return weightedScore;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
new file mode 100644
index 0000000..66566f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+public class JenkinsHash {
+  // max value to limit it to 4 bytes
+  private static final long MAX_VALUE = 0xFFFFFFFFL;
+  private static final long CRUSH_HASH_SEED = 1315423911L;
+
+  /**
+   * Convert a byte into a long value without making it negative.
+   */
+  private static long byteToLong(byte b) {
+    long val = b & 0x7F;
+    if ((b & 0x80) != 0) {
+      val += 128;
+    }
+    return val;
+  }
+
+  /**
+   * Do addition and turn into 4 bytes.
+   */
+  private static long add(long val, long add) {
+    return (val + add) & MAX_VALUE;
+  }
+
+  /**
+   * Do subtraction and turn into 4 bytes.
+   */
+  private static long subtract(long val, long subtract) {
+    return (val - subtract) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits and turn in 4 bytes.
+   */
+  private static long xor(long val, long xor) {
+    return (val ^ xor) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits.  Cut down to 4 bytes.
+   */
+  private static long leftShift(long val, int shift) {
+    return (val << shift) & MAX_VALUE;
+  }
+
+  /**
+   * Convert 4 bytes from the buffer at offset into a long value.
+   */
+  private static long fourByteToLong(byte[] bytes, int offset) {
+    return (byteToLong(bytes[offset + 0])
+        + (byteToLong(bytes[offset + 1]) << 8)
+        + (byteToLong(bytes[offset + 2]) << 16)
+        + (byteToLong(bytes[offset + 3]) << 24));
+  }
+
+  /**
+   * Mix up the values in the hash function.
+   */
+  private static Triple hashMix(Triple t) {
+    long a = t.a; long b = t.b; long c = t.c;
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+    return new Triple(a, b, c);
+  }
+
+  private static class Triple {
+    long a;
+    long b;
+    long c;
+
+    public Triple(long a, long b, long c) {
+      this.a = a; this.b = b; this.c = c;
+    }
+  }
+
+  public long hash(long a) {
+    long hash = xor(CRUSH_HASH_SEED, a);
+    long b = a;
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b) {
+    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(x, a, hash));
+    x = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, y, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b, long c) {
+    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(c, x, hash));
+    c = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    y = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, c, hash));
+    hash = val.c;
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
new file mode 100644
index 0000000..3a52a21
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class Node implements Comparable<Node> {
+  private String _name;
+  private String _type;
+  private long _id;
+  private long _weight;
+
+  private LinkedHashMap<String, Node> _children = new LinkedHashMap<String, Node>();
+  private Node _parent;
+
+  private boolean _failed;
+
+  public Node() {
+
+  }
+
+  public Node(Node node) {
+    _name = node.getName();
+    _type = node.getType();
+    _id = node.getId();
+    _weight = node.getWeight();
+    _failed = node.isFailed();
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public void setName(String name) {
+    _name = name;
+  }
+
+  public String getType() {
+    return _type;
+  }
+
+  public void setType(String type) {
+    _type = type;
+  }
+
+  public long getId() {
+    return _id;
+  }
+
+  public void setId(long id) {
+    _id = id;
+  }
+
+  public long getWeight() {
+    return _weight;
+  }
+
+  public void setWeight(long weight) {
+    _weight = weight;
+  }
+
+  public void addWeight(long weight) { _weight += weight; }
+
+  public boolean isFailed() {
+    return _failed;
+  }
+
+  public void setFailed(boolean failed) {
+    if (!isLeaf()) {
+      throw new UnsupportedOperationException("you cannot set failed on a non-leaf!");
+    }
+    _failed = failed;
+  }
+
+  public List<Node> getChildren() {
+    return new ArrayList<Node>(_children.values());
+  }
+
+  /**
+   * Add a child, if there exists a child with the same name, will replace it.
+   *
+   * @param child
+   */
+  public void addChild(Node child) {
+    _children.put(child.getName(), child);
+  }
+
+  /**
+   * Has child with given name.
+   * @param name
+   * @return
+   */
+  public boolean hasChild(String name) {
+    return _children.containsKey(name);
+  }
+
+  /**
+   * Get child node with given name.
+   *
+   * @param name
+   * @return
+   */
+  public Node getChild(String name) {
+    return _children.get(name);
+  }
+
+  public boolean isLeaf() {
+    return _children == null || _children.isEmpty();
+  }
+
+  public Node getParent() {
+    return _parent;
+  }
+
+  public void setParent(Node parent) {
+    _parent = parent;
+  }
+
+  /**
+   * Returns all child nodes that match the type. Returns itself if this node matches it. If no
+   * child matches the type, an empty list is returned.
+   */
+  protected List<Node> findChildren(String type) {
+    List<Node> nodes = new ArrayList<Node>();
+    if (_type.equalsIgnoreCase(type)) {
+      nodes.add(this);
+    } else if (!isLeaf()) {
+      for (Node child: _children.values()) {
+        nodes.addAll(child.findChildren(type));
+      }
+    }
+    return nodes;
+  }
+
+  /**
+   * Returns the number of all child nodes that match the type. Returns 1 if this node matches it.
+   * Returns 0 if no child matches the type.
+   */
+  public int getChildrenCount(String type) {
+    int count = 0;
+    if (_type.equalsIgnoreCase(type)) {
+      count++;
+    } else if (!isLeaf()) {
+      for (Node child: _children.values()) {
+        count += child.getChildrenCount(type);
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Returns the top-most ("root") node from this node. If this node itself does not have a parent,
+   * returns itself.
+   */
+  public Node getRoot() {
+    Node node = this;
+    while (node.getParent() != null) {
+      node = node.getParent();
+    }
+    return node;
+  }
+
+  @Override
+  public String toString() {
+    return _name + ":" + _id;
+  }
+
+  @Override
+  public int hashCode() {
+    return _name.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof Node)) {
+      return false;
+    }
+    Node that = (Node)obj;
+    return _name.equals(that.getName());
+  }
+
+  @Override
+  public int compareTo(Node o) {
+    return _name.compareTo(o.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
new file mode 100644
index 0000000..1057fad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -0,0 +1,295 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Topology represents the structure of a cluster (the hierarchy of the nodes, its fault boundary, etc).
+ * This class is intended for topology-aware partition placement.
+ */
+public class Topology {
+  private static Logger logger = Logger.getLogger(Topology.class);
+  public enum Types {
+    ROOT,
+    ZONE,
+    INSTANCE
+  }
+  private static final int DEFAULT_NODE_WEIGHT = 1000;
+
+  private final MessageDigest _md;
+  private Node _root; // root of the tree structure of all nodes;
+  private List<String> _allInstances;
+  private List<String> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigMap;
+  private HelixProperty _clusterConfig;
+  private String _faultZoneType;
+  private String _endNodeType;
+  private boolean _useDefaultTopologyDef;
+  private LinkedHashSet<String> _types;
+
+  /* default names for domain paths, if value is not specified for a domain path, the default one is used */
+  // TODO: default values can be defined in clusterConfig.
+  private Map<String, String> _defaultDomainPathValues = new HashMap<String, String>();
+
+  public Topology(final List<String> allNodes, final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+    try {
+      _md = MessageDigest.getInstance("SHA-1");
+      _allInstances = allNodes;
+      _liveInstances = liveNodes;
+      _instanceConfigMap = instanceConfigMap;
+      _clusterConfig = clusterConfig;
+      _types = new LinkedHashSet<String>();
+
+      String topologyDef = _clusterConfig.getRecord()
+          .getSimpleField(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name());
+      if (topologyDef != null) {
+        // Customized cluster topology definition is configured.
+        String[] types = topologyDef.trim().split("/");
+        for (int i = 0; i < types.length; i++) {
+          if (types[i].length() != 0) {
+            _types.add(types[i]);
+          }
+        }
+        if (_types.size() == 0) {
+          logger.error("Invalid cluster topology definition " + topologyDef);
+          throw new HelixException("Invalid cluster topology definition " + topologyDef);
+        } else {
+          String lastType = null;
+          for (String type : _types) {
+            _defaultDomainPathValues.put(type, "Helix_default_" + type);
+            lastType = type;
+          }
+          _endNodeType = lastType;
+          _faultZoneType = _clusterConfig.getRecord()
+              .getStringField(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(),
+                  _endNodeType);
+          if (!_types.contains(_faultZoneType)) {
+            throw new HelixException(String
+                .format("Invalid fault zone type %s, not present in topology definition %s.",
+                    _faultZoneType, topologyDef));
+          }
+          _useDefaultTopologyDef = false;
+        }
+      } else {
+        // Use default cluster topology definition, i,e. /root/zone/instance
+        _types.add(Types.ZONE.name());
+        _types.add(Types.INSTANCE.name());
+        _endNodeType = Types.INSTANCE.name();
+        _faultZoneType = Types.ZONE.name();
+        _useDefaultTopologyDef = true;
+      }
+    } catch (NoSuchAlgorithmException ex) {
+      throw new IllegalArgumentException(ex);
+    }
+    if (_useDefaultTopologyDef) {
+      _root = createClusterTreeWithDefaultTopologyDef();
+    } else {
+      _root = createClusterTreeWithCustomizedTopology();
+    }
+  }
+
+  public String getEndNodeType() {
+    return _endNodeType;
+  }
+
+  public String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public Node getRootNode() {
+    return _root;
+  }
+
+  public List<Node> getFaultZones() {
+    if (_root != null) {
+      return _root.findChildren(getFaultZoneType());
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Creates a tree representing the cluster structure using default cluster topology definition
+   * (i,e no topology definition given and no domain id set).
+   */
+  private Node createClusterTreeWithDefaultTopologyDef() {
+    // root
+    Node root = new Node();
+    root.setName("root");
+    root.setId(computeId("root"));
+    root.setType(Types.ROOT.name());
+
+    for (String ins : _allInstances) {
+      InstanceConfig config = _instanceConfigMap.get(ins);
+      if (config == null) {
+        throw new HelixException(String.format("Config for instance %s is not found!", ins));
+      }
+      String zone = config.getZoneId();
+      if (zone == null) {
+        //TODO: we should allow non-rack cluster for back-compatible. This should be solved once
+        // we have the hierarchy style of domain id for instance.
+        throw new HelixException(String
+            .format("ZONE_ID for instance %s is not set, failed the topology-aware placement!",
+                ins));
+      }
+      Map<String, String> pathValueMap = new HashMap<String, String>();
+      pathValueMap.put(Types.ZONE.name(), zone);
+      pathValueMap.put(Types.INSTANCE.name(), ins);
+
+      int weight = config.getWeight();
+      if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+        weight = DEFAULT_NODE_WEIGHT;
+      }
+      addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+    }
+
+    return root;
+  }
+
+  /**
+   * Creates a tree representing the cluster structure using default cluster topology definition
+   * (i,e no topology definition given and no domain id set).
+   */
+  private Node createClusterTreeWithCustomizedTopology() {
+    // root
+    Node root = new Node();
+    root.setName("root");
+    root.setId(computeId("root"));
+    root.setType(Types.ROOT.name());
+
+    for (String ins : _allInstances) {
+      InstanceConfig insConfig = _instanceConfigMap.get(ins);
+      if (insConfig == null) {
+        throw new HelixException(String.format("Config for instance %s is not found!", ins));
+      }
+      String domain = insConfig.getDomain();
+      if (domain == null) {
+        throw new HelixException(String
+            .format("Domain for instance %s is not set, failed the topology-aware placement!",
+                ins));
+      }
+
+      String[] pathPairs = domain.trim().split(",");
+      Map<String, String> pathValueMap = new HashMap<String, String>();
+      for (String pair : pathPairs) {
+        String[] values = pair.trim().split("=");
+        if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) {
+          throw new HelixException(String.format(
+              "Domain-Value pair %s for instance %s is not valid, failed the topology-aware placement!",
+              pair, ins));
+        }
+        String type = values[0];
+        String value = values[1];
+
+        if (!_types.contains(type)) {
+          logger.warn(String
+              .format("Path %s defined in domain of instance %s not recognized, ignored!", pair,
+                  ins));
+          continue;
+        }
+        pathValueMap.put(type, value);
+      }
+
+      int weight = insConfig.getWeight();
+      if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+        weight = DEFAULT_NODE_WEIGHT;
+      }
+
+      root = addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+    }
+
+    return root;
+  }
+
+
+  /**
+   * Add an end node to the tree, create all the paths to the leaf node if not present.
+   */
+  private Node addEndNode(Node root, String instanceName, Map<String, String> pathNameMap,
+      int instanceWeight, List<String> liveInstances) {
+    Node current = root;
+    List<Node> pathNodes = new ArrayList<Node>();
+    for (String path : _types) {
+      String pathValue = pathNameMap.get(path);
+      if (pathValue == null || pathValue.isEmpty()) {
+        pathValue = _defaultDomainPathValues.get(path);
+      }
+      pathNodes.add(current);
+      if (!current.hasChild(pathValue)) {
+        Node n = new Node();
+        n.setName(pathValue);
+        n.setId(computeId(pathValue));
+        n.setType(path);
+        n.setParent(current);
+
+        // if it is leaf node.
+        if (path.equals(_endNodeType)) {
+          if (liveInstances.contains(instanceName)) {
+            // node is alive
+            n.setWeight(instanceWeight);
+            // add instance weight to all of its parent nodes.
+            for (Node node :  pathNodes) {
+              node.addWeight(instanceWeight);
+            }
+          } else {
+            n.setFailed(true);
+            n.setWeight(0);
+          }
+        }
+        current.addChild(n);
+      }
+      current = current.getChild(pathValue);
+    }
+    return root;
+  }
+
+  private long computeId(String name) {
+    byte[] h = _md.digest(name.getBytes());
+    return bstrTo32bit(h);
+  }
+
+  private long bstrTo32bit(byte[] bstr) {
+    if (bstr.length < 4) {
+      throw new IllegalArgumentException("hashed is less than 4 bytes!");
+    }
+    // need to "simulate" unsigned int
+    return (long) (((ord(bstr[0]) << 24) | (ord(bstr[1]) << 16) | (ord(bstr[2]) << 8) | (ord(
+        bstr[3])))) & 0xffffffffL;
+  }
+
+  private int ord(byte b) {
+    return b & 0xff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index cb5bda8..dacf98d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
@@ -56,6 +57,7 @@ public class ClusterDataCache {
 
   private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
 
+  private ClusterConfig _clusterConfig;
   Map<String, LiveInstance> _liveInstanceMap;
   Map<String, LiveInstance> _liveInstanceCacheMap;
   Map<String, IdealState> _idealStateMap;
@@ -200,11 +202,11 @@ public class ClusterDataCache {
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
     _idealStateRuleMap = Maps.newHashMap();
-    HelixProperty clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
-    if (clusterConfig != null) {
-      for (String simpleKey : clusterConfig.getRecord().getSimpleFields().keySet()) {
+    _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+    if (_clusterConfig != null) {
+      for (String simpleKey : _clusterConfig.getRecord().getSimpleFields().keySet()) {
         if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
-          String simpleValue = clusterConfig.getRecord().getSimpleField(simpleKey);
+          String simpleValue = _clusterConfig.getRecord().getSimpleField(simpleKey);
           String[] rules = simpleValue.split("(?<!\\\\),");
           Map<String, String> singleRule = Maps.newHashMap();
           for (String rule : rules) {
@@ -232,6 +234,10 @@ public class ClusterDataCache {
     return true;
   }
 
+  public ClusterConfig getClusterConfig() {
+    return _clusterConfig;
+  }
+
   /**
    * Retrieves the idealstates for all resources
    * @return


[2/5] helix git commit: [HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy

Posted by lx...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
deleted file mode 100644
index 959609f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ /dev/null
@@ -1,753 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-
-public class AutoRebalanceStrategy implements RebalanceStrategy {
-  private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-  private final ReplicaPlacementScheme _placementScheme;
-
-  private String _resourceName;
-  private List<String> _partitions;
-  private LinkedHashMap<String, Integer> _states;
-  private int _maximumPerNode;
-
-  private Map<String, Node> _nodeMap;
-  private List<Node> _liveNodesList;
-  private Map<Integer, String> _stateMap;
-
-  private Map<Replica, Node> _preferredAssignment;
-  private Map<Replica, Node> _existingPreferredAssignment;
-  private Map<Replica, Node> _existingNonPreferredAssignment;
-  private Set<Replica> _orphaned;
-
-  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
-    init(resourceName, partitions, states, maximumPerNode);
-    _placementScheme = new DefaultPlacementScheme();
-  }
-
-  public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states) {
-    this(resourceName, partitions, states, Integer.MAX_VALUE);
-  }
-
-  @Override
-  public void init(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
-    _maximumPerNode = maximumPerNode;
-  }
-
-  @Override
-  public ZNRecord computePartitionAssignment(final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
-    int numReplicas = countStateReplicas();
-    ZNRecord znRecord = new ZNRecord(_resourceName);
-    if (liveNodes.size() == 0) {
-      return znRecord;
-    }
-    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
-    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
-    _nodeMap = new HashMap<String, Node>();
-    _liveNodesList = new ArrayList<Node>();
-
-    for (String id : allNodes) {
-      Node node = new Node(id);
-      node.capacity = 0;
-      node.hasCeilingCapacity = false;
-      _nodeMap.put(id, node);
-    }
-    for (int i = 0; i < liveNodes.size(); i++) {
-      boolean usingCeiling = false;
-      int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
-      if (distRemainder > 0 && targetSize < _maximumPerNode) {
-        targetSize += 1;
-        distRemainder = distRemainder - 1;
-        usingCeiling = true;
-      }
-      Node node = _nodeMap.get(liveNodes.get(i));
-      node.isAlive = true;
-      node.capacity = targetSize;
-      node.hasCeilingCapacity = usingCeiling;
-      _liveNodesList.add(node);
-    }
-
-    // compute states for all replica ids
-    _stateMap = generateStateMap();
-
-    // compute the preferred mapping if all nodes were up
-    _preferredAssignment = computePreferredPlacement(allNodes);
-
-    // logger.info("preferred mapping:"+ preferredAssignment);
-    // from current mapping derive the ones in preferred location
-    // this will update the nodes with their current fill status
-    _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
-
-    // from current mapping derive the ones not in preferred location
-    _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
-
-    // compute orphaned replicas that are not assigned to any node
-    _orphaned = computeOrphaned();
-    if (logger.isInfoEnabled()) {
-      logger.info("orphan = " + _orphaned);
-    }
-
-    moveNonPreferredReplicasToPreferred();
-
-    assignOrphans();
-
-    moveExcessReplicas();
-
-    prepareResult(znRecord);
-    return znRecord;
-  }
-
-  /**
-   * Move replicas assigned to non-preferred nodes if their current node is at capacity
-   * and its preferred node is under capacity.
-   */
-  private void moveNonPreferredReplicasToPreferred() {
-    // iterate through non preferred and see if we can move them to the
-    // preferred location if the donor has more than it should and stealer has
-    // enough capacity
-    Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<Replica, Node> entry = iterator.next();
-      Replica replica = entry.getKey();
-      Node donor = entry.getValue();
-      Node receiver = _preferredAssignment.get(replica);
-      if (donor.capacity < donor.currentlyAssigned
-          && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
-        donor.currentlyAssigned = donor.currentlyAssigned - 1;
-        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-        donor.nonPreferred.remove(replica);
-        receiver.preferred.add(replica);
-        donor.newReplicas.remove(replica);
-        receiver.newReplicas.add(replica);
-        iterator.remove();
-      }
-    }
-  }
-
-  /**
-   * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
-   */
-  private void assignOrphans() {
-    // now iterate over nodes and remaining orphaned partitions and assign
-    // partitions randomly
-    // Better to iterate over orphaned partitions first
-    Iterator<Replica> it = _orphaned.iterator();
-    while (it.hasNext()) {
-      Replica replica = it.next();
-      boolean added = false;
-      int startIndex = computeRandomStartIndex(replica);
-      for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
-        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
-          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-          receiver.nonPreferred.add(replica);
-          receiver.newReplicas.add(replica);
-          added = true;
-          break;
-        }
-      }
-      if (!added) {
-        // try adding the replica by making room for it
-        added = assignOrphanByMakingRoom(replica);
-      }
-      if (added) {
-        it.remove();
-      }
-    }
-    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
-      logger.info("could not assign nodes to partitions: " + _orphaned);
-    }
-  }
-
-  /**
-   * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
-   * @param replica The replica to assign
-   * @return true if the assignment succeeded, false otherwise
-   */
-  private boolean assignOrphanByMakingRoom(Replica replica) {
-    Node capacityDonor = null;
-    Node capacityAcceptor = null;
-    int startIndex = computeRandomStartIndex(replica);
-    for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-      Node current = _liveNodesList.get(index % _liveNodesList.size());
-      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
-          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
-        // this node has space but cannot accept the node
-        capacityDonor = current;
-      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
-          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
-        // this node would be able to accept the replica if it has ceiling capacity
-        capacityAcceptor = current;
-      }
-      if (capacityDonor != null && capacityAcceptor != null) {
-        break;
-      }
-    }
-    if (capacityDonor != null && capacityAcceptor != null) {
-      // transfer ceiling capacity and add the node
-      capacityAcceptor.steal(capacityDonor, replica);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Move replicas from too-full nodes to nodes that can accept the replicas
-   */
-  private void moveExcessReplicas() {
-    // iterate over nodes and move extra load
-    Iterator<Replica> it;
-    for (Node donor : _liveNodesList) {
-      if (donor.capacity < donor.currentlyAssigned) {
-        Collections.sort(donor.nonPreferred);
-        it = donor.nonPreferred.iterator();
-        while (it.hasNext()) {
-          Replica replica = it.next();
-          int startIndex = computeRandomStartIndex(replica);
-          for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
-            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
-            if (receiver.canAdd(replica)) {
-              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
-              receiver.nonPreferred.add(replica);
-              donor.currentlyAssigned = donor.currentlyAssigned - 1;
-              it.remove();
-              break;
-            }
-          }
-          if (donor.capacity >= donor.currentlyAssigned) {
-            break;
-          }
-        }
-        if (donor.capacity < donor.currentlyAssigned) {
-          logger.warn("Could not take partitions out of node:" + donor.id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Update a ZNRecord with the results of the rebalancing.
-   * @param znRecord
-   */
-  private void prepareResult(ZNRecord znRecord) {
-    // The map fields are keyed on partition name to a pair of node and state, i.e. it
-    // indicates that the partition with given state is served by that node
-    //
-    // The list fields are also keyed on partition and list all the nodes serving that partition.
-    // This is useful to verify that there is no node serving multiple replicas of the same
-    // partition.
-    Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
-    for (String partition : _partitions) {
-      znRecord.setMapField(partition, new TreeMap<String, String>());
-      znRecord.setListField(partition, new ArrayList<String>());
-      newPreferences.put(partition, new ArrayList<String>());
-    }
-
-    // for preference lists, the rough priority that we want is:
-    // [existing preferred, existing non-preferred, non-existing preferred, non-existing
-    // non-preferred]
-    for (Node node : _liveNodesList) {
-      for (Replica replica : node.preferred) {
-        if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
-        } else {
-          znRecord.getListField(replica.partition).add(node.id);
-        }
-      }
-    }
-    for (Node node : _liveNodesList) {
-      for (Replica replica : node.nonPreferred) {
-        if (node.newReplicas.contains(replica)) {
-          newPreferences.get(replica.partition).add(node.id);
-        } else {
-          znRecord.getListField(replica.partition).add(node.id);
-        }
-      }
-    }
-    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
-
-    // generate preference maps based on the preference lists
-    for (String partition : _partitions) {
-      List<String> preferenceList = znRecord.getListField(partition);
-      int i = 0;
-      for (String participant : preferenceList) {
-        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
-        i++;
-      }
-    }
-  }
-
-  /**
-   * Adjust preference lists to reduce the number of same replicas on an instance. This will
-   * separately normalize two sets of preference lists, and then append the results of the second
-   * set to those of the first. This basically ensures that existing replicas are automatically
-   * preferred.
-   * @param preferenceLists map of (partition --> list of nodes)
-   * @param newPreferences map containing node preferences not consistent with the current
-   *          assignment
-   */
-  private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
-      Map<String, List<String>> newPreferences) {
-
-    Map<String, Map<String, Integer>> nodeReplicaCounts =
-        new HashMap<String, Map<String, Integer>>();
-    for (String partition : preferenceLists.keySet()) {
-      normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
-    }
-    for (String partition : newPreferences.keySet()) {
-      normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
-      preferenceLists.get(partition).addAll(newPreferences.get(partition));
-    }
-  }
-
-  /**
-   * Adjust a single preference list for replica assignment imbalance
-   * @param preferenceList list of node names
-   * @param nodeReplicaCounts map of (node --> state --> count)
-   */
-  private void normalizePreferenceList(List<String> preferenceList,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    List<String> newPreferenceList = new ArrayList<String>();
-    int replicas = Math.min(countStateReplicas(), preferenceList.size());
-
-    // make this a LinkedHashSet to preserve iteration order
-    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
-    for (int i = 0; i < replicas; i++) {
-      String state = _stateMap.get(i);
-      String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
-      newPreferenceList.add(node);
-      notAssigned.remove(node);
-      Map<String, Integer> counts = nodeReplicaCounts.get(node);
-      counts.put(state, counts.get(state) + 1);
-    }
-    preferenceList.clear();
-    preferenceList.addAll(newPreferenceList);
-  }
-
-  /**
-   * Get the node which hosts the fewest of a given replica
-   * @param state the state
-   * @param nodes nodes to check
-   * @param nodeReplicaCounts current assignment of replicas
-   * @return the node most willing to accept the replica
-   */
-  private String getMinimumNodeForReplica(String state, Set<String> nodes,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    String minimalNode = null;
-    int minimalCount = Integer.MAX_VALUE;
-    for (String node : nodes) {
-      int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
-      if (count < minimalCount) {
-        minimalCount = count;
-        minimalNode = node;
-      }
-    }
-    return minimalNode;
-  }
-
-  /**
-   * Safe check for the number of replicas of a given id assiged to a node
-   * @param state the state to assign
-   * @param node the node to check
-   * @param nodeReplicaCounts a map of node to replica id and counts
-   * @return the number of currently assigned replicas of the given id
-   */
-  private int getReplicaCountForNode(String state, String node,
-      Map<String, Map<String, Integer>> nodeReplicaCounts) {
-    if (!nodeReplicaCounts.containsKey(node)) {
-      Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
-      replicaCounts.put(state, 0);
-      nodeReplicaCounts.put(node, replicaCounts);
-      return 0;
-    }
-    Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
-    if (!replicaCounts.containsKey(state)) {
-      replicaCounts.put(state, 0);
-      return 0;
-    }
-    return replicaCounts.get(state);
-  }
-
-  /**
-   * Compute the subset of the current mapping where replicas are not mapped according to their
-   * preferred assignment.
-   * @param currentMapping Current mapping of replicas to nodes
-   * @return The current assignments that do not conform to the preferred assignment
-   */
-  private Map<Replica, Node> computeExistingNonPreferredPlacement(
-      Map<String, Map<String, String>> currentMapping) {
-    Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
-    int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
-      for (String nodeId : nodeStateMap.keySet()) {
-        Node node = _nodeMap.get(nodeId);
-        boolean skip = false;
-        for (Replica replica : node.preferred) {
-          if (replica.partition.equals(partition)) {
-            skip = true;
-            break;
-          }
-        }
-        if (skip) {
-          continue;
-        }
-        // check if its in one of the preferred position
-        for (int replicaId = 0; replicaId < count; replicaId++) {
-          Replica replica = new Replica(partition, replicaId);
-          if (!_preferredAssignment.containsKey(replica)) {
-
-            logger.info("partitions: " + _partitions);
-            logger.info("currentMapping.keySet: " + currentMapping.keySet());
-            throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
-          }
-
-          if (_preferredAssignment.get(replica).id != node.id
-              && !_existingPreferredAssignment.containsKey(replica)
-              && !existingNonPreferredAssignment.containsKey(replica)) {
-            existingNonPreferredAssignment.put(replica, node);
-            node.nonPreferred.add(replica);
-
-            break;
-          }
-        }
-      }
-    }
-    return existingNonPreferredAssignment;
-  }
-
-  /**
-   * Get a live node index to try first for a replica so that each possible start index is
-   * roughly uniformly assigned.
-   * @param replica The replica to assign
-   * @return The starting node index to try
-   */
-  private int computeRandomStartIndex(final Replica replica) {
-    return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
-  }
-
-  /**
-   * Get a set of replicas not currently assigned to any node
-   * @return Unassigned replicas
-   */
-  private Set<Replica> computeOrphaned() {
-    Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
-    for (Replica r : _existingPreferredAssignment.keySet()) {
-      if (orphanedPartitions.contains(r)) {
-        orphanedPartitions.remove(r);
-      }
-    }
-    for (Replica r : _existingNonPreferredAssignment.keySet()) {
-      if (orphanedPartitions.contains(r)) {
-        orphanedPartitions.remove(r);
-      }
-    }
-
-    return orphanedPartitions;
-  }
-
-  /**
-   * Determine the replicas already assigned to their preferred nodes
-   * @param currentMapping Current assignment of replicas to nodes
-   * @return Assignments that conform to the preferred placement
-   */
-  private Map<Replica, Node> computeExistingPreferredPlacement(
-      final Map<String, Map<String, String>> currentMapping) {
-    Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
-    int count = countStateReplicas();
-    for (String partition : currentMapping.keySet()) {
-      Map<String, String> nodeStateMap = currentMapping.get(partition);
-      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
-      for (String nodeId : nodeStateMap.keySet()) {
-        Node node = _nodeMap.get(nodeId);
-        node.currentlyAssigned = node.currentlyAssigned + 1;
-        // check if its in one of the preferred position
-        for (int replicaId = 0; replicaId < count; replicaId++) {
-          Replica replica = new Replica(partition, replicaId);
-          if (_preferredAssignment.containsKey(replica)
-              && !existingPreferredAssignment.containsKey(replica)
-              && _preferredAssignment.get(replica).id == node.id) {
-            existingPreferredAssignment.put(replica, node);
-            node.preferred.add(replica);
-            break;
-          }
-        }
-      }
-    }
-
-    return existingPreferredAssignment;
-  }
-
-  /**
-   * Given a predefined set of all possible nodes, compute an assignment of replicas to
-   * nodes that evenly assigns all replicas to nodes.
-   * @param allNodes Identifiers to all nodes, live and non-live
-   * @return Preferred assignment of replicas
-   */
-  private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
-    Map<Replica, Node> preferredMapping;
-    preferredMapping = new HashMap<Replica, Node>();
-    int partitionId = 0;
-    int numReplicas = countStateReplicas();
-    int count = countStateReplicas();
-    for (String partition : _partitions) {
-      for (int replicaId = 0; replicaId < count; replicaId++) {
-        Replica replica = new Replica(partition, replicaId);
-        String nodeName =
-            _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
-                allNodes);
-        preferredMapping.put(replica, _nodeMap.get(nodeName));
-      }
-      partitionId = partitionId + 1;
-    }
-    return preferredMapping;
-  }
-
-  /**
-   * Counts the total number of replicas given a state-count mapping
-   * @return
-   */
-  private int countStateReplicas() {
-    int total = 0;
-    for (Integer count : _states.values()) {
-      total += count;
-    }
-    return total;
-  }
-
-  /**
-   * Compute a map of replica ids to state names
-   * @return Map: replica id -> state name
-   */
-  private Map<Integer, String> generateStateMap() {
-    int replicaId = 0;
-    Map<Integer, String> stateMap = new HashMap<Integer, String>();
-    for (String state : _states.keySet()) {
-      Integer count = _states.get(state);
-      for (int i = 0; i < count; i++) {
-        stateMap.put(replicaId, state);
-        replicaId++;
-      }
-    }
-    return stateMap;
-  }
-
-  /**
-   * A Node is an entity that can serve replicas. It has a capacity and knowledge
-   * of replicas assigned to it, so it can decide if it can receive additional replicas.
-   */
-  class Node {
-    public int currentlyAssigned;
-    public int capacity;
-    public boolean hasCeilingCapacity;
-    private final String id;
-    boolean isAlive;
-    private final List<Replica> preferred;
-    private final List<Replica> nonPreferred;
-    private final Set<Replica> newReplicas;
-
-    public Node(String id) {
-      preferred = new ArrayList<Replica>();
-      nonPreferred = new ArrayList<Replica>();
-      newReplicas = new TreeSet<Replica>();
-      currentlyAssigned = 0;
-      isAlive = false;
-      this.id = id;
-    }
-
-    /**
-     * Check if this replica can be legally added to this node
-     * @param replica The replica to test
-     * @return true if the assignment can be made, false otherwise
-     */
-    public boolean canAdd(Replica replica) {
-      if (currentlyAssigned >= capacity) {
-        return false;
-      }
-      return canAddIfCapacity(replica);
-    }
-
-    /**
-     * Check if this replica can be legally added to this node, provided that it has enough
-     * capacity.
-     * @param replica The replica to test
-     * @return true if the assignment can be made, false otherwise
-     */
-    public boolean canAddIfCapacity(Replica replica) {
-      if (!isAlive) {
-        return false;
-      }
-      for (Replica r : preferred) {
-        if (r.partition.equals(replica.partition)) {
-          return false;
-        }
-      }
-      for (Replica r : nonPreferred) {
-        if (r.partition.equals(replica.partition)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    /**
-     * Receive a replica by stealing capacity from another Node
-     * @param donor The node that has excess capacity
-     * @param replica The replica to receive
-     */
-    public void steal(Node donor, Replica replica) {
-      donor.hasCeilingCapacity = false;
-      donor.capacity--;
-      hasCeilingCapacity = true;
-      capacity++;
-      currentlyAssigned++;
-      nonPreferred.add(replica);
-      newReplicas.add(replica);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
-          .append("\nnonpreferred:").append(nonPreferred.size());
-      return sb.toString();
-    }
-  }
-
-  /**
-   * A Replica is a combination of a partition of the resource, the state the replica is in
-   * and an identifier signifying a specific replica of a given partition and state.
-   */
-  class Replica implements Comparable<Replica> {
-    private String partition;
-    private int replicaId; // this is a partition-relative id
-    private String format;
-
-    public Replica(String partition, int replicaId) {
-      this.partition = partition;
-      this.replicaId = replicaId;
-      this.format = this.partition + "|" + this.replicaId;
-    }
-
-    @Override
-    public String toString() {
-      return format;
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      if (that instanceof Replica) {
-        return this.format.equals(((Replica) that).format);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return this.format.hashCode();
-    }
-
-    @Override
-    public int compareTo(Replica that) {
-      if (that instanceof Replica) {
-        return this.format.compareTo(that.format);
-      }
-      return -1;
-    }
-  }
-
-  /**
-   * Interface for providing a custom approach to computing a replica's affinity to a node.
-   */
-  public interface ReplicaPlacementScheme {
-    /**
-     * Initialize global state
-     * @param manager The instance to which this placement is associated
-     */
-    public void init(final HelixManager manager);
-
-    /**
-     * Given properties of this replica, determine the node it would prefer to be served by
-     * @param partitionId The current partition
-     * @param replicaId The current replica with respect to the current partition
-     * @param numPartitions The total number of partitions
-     * @param numReplicas The total number of replicas per partition
-     * @param nodeNames A list of identifiers of all nodes, live and non-live
-     * @return The name of the node that would prefer to serve this replica
-     */
-    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
-        final List<String> nodeNames);
-  }
-
-  /**
-   * Compute preferred placements based on a default strategy that assigns replicas to nodes as
-   * evenly as possible while avoiding placing two replicas of the same partition on any node.
-   */
-  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
-    @Override
-    public void init(final HelixManager manager) {
-      // do nothing since this is independent of the manager
-    }
-
-    @Override
-    public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
-        final List<String> nodeNames) {
-      int index;
-      if (nodeNames.size() > numPartitions) {
-        // assign replicas in partition order in case there are more nodes than partitions
-        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
-      } else if (nodeNames.size() == numPartitions) {
-        // need a replica offset in case the sizes of these sets are the same
-        index =
-            ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
-                % nodeNames.size();
-      } else {
-        // in all other cases, assigning a replica at a time for each partition is reasonable
-        index = (partitionId + replicaId) % nodeNames.size();
-      }
-      return nodeNames.get(index);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
deleted file mode 100644
index 4daae82..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ZNRecord;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Assignment strategy interface that computes the assignment of partition->instance.
- */
-public interface RebalanceStrategy {
-  /**
-   * Perform the necessary initialization for the rebalance strategy object.
-   * @param resourceName
-   * @param partitions
-   * @param states
-   * @param maximumPerNode
-   */
-  void init(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode);
-
-  /**
-   * Compute the preference lists and (optional partition-state mapping) for the given resource.
-   *
-   * @param liveNodes
-   * @param currentMapping
-   * @param allNodes
-   * @return
-   */
-  ZNRecord computePartitionAssignment(final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index e97ac9b..73f2cbb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ConstraintItem;
@@ -607,6 +608,13 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void addResource(String clusterName, String resourceName, int partitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+        rebalanceStrategy, 0, -1);
+  }
+
+  @Override
   public void addResource(String clusterName, String resourceName, IdealState idealstate) {
     String stateModelRef = idealstate.getStateModelDefRef();
     String stateModelDefPath =
@@ -629,14 +637,21 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void addResource(String clusterName, String resourceName, int partitions,
       String stateModelRef, String rebalancerMode, int bucketSize) {
-    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize,
-        -1);
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1);
 
   }
 
   @Override
   public void addResource(String clusterName, String resourceName, int partitions,
       String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) {
+    addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+        RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance);
+  }
+
+  @Override
+  public void addResource(String clusterName, String resourceName, int partitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+      int maxPartitionsPerInstance) {
     if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
       throw new HelixException("cluster " + clusterName + " is not setup yet");
     }
@@ -647,6 +662,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     RebalanceMode mode =
         idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO);
     idealState.setRebalanceMode(mode);
+    idealState.setRebalanceStrategy(rebalanceStrategy);
     idealState.setReplicas("" + 0);
     idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
     if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
@@ -1014,8 +1030,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       @Override
       public ZNRecord update(ZNRecord currentData) {
         ClusterConstraints constraints =
-            currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
-                currentData);
+            currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();
@@ -1153,6 +1168,26 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
+  public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) {
+    if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
+      throw new HelixException("cluster " + clusterName + " is not setup yet");
+    }
+
+    if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+      throw new HelixException("cluster " + clusterName + " instance " + instanceName
+          + " is not setup yet");
+    }
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+
+    PropertyKey configKey = keyBuilder.instanceConfig(instanceName);
+    InstanceConfig config = accessor.getProperty(configKey);
+    config.setZoneId(zoneId);
+    accessor.setProperty(configKey, config);
+  }
+
+  @Override
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
new file mode 100644
index 0000000..25a16d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -0,0 +1,92 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+/**
+ * Cluster configurations
+ */
+public class ClusterConfig extends HelixProperty {
+  /**
+   * Configurable characteristics of a cluster
+   */
+  public enum ClusterConfigProperty {
+    HELIX_DISABLE_PIPELINE_TRIGGERS,
+    TOPOLOGY,  // cluster topology definition, for example, "/zone/rack/host/instance"
+    FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+  }
+
+  /**
+   * Instantiate for a specific cluster
+   *
+   * @param cluster the cluster identifier
+   */
+  public ClusterConfig(String cluster) {
+    super(cluster);
+  }
+
+  /**
+   * Instantiate with a pre-populated record
+   *
+   * @param record a ZNRecord corresponding to a cluster configuration
+   */
+  public ClusterConfig(ZNRecord record) {
+    super(record);
+  }
+
+  /**
+   * Whether to persist best possible assignment in a resource's idealstate.
+   *
+   * @return
+   */
+  public Boolean isPipelineTriggersDisabled() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ClusterConfig) {
+      ClusterConfig that = (ClusterConfig) obj;
+
+      if (this.getId().equals(that.getId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+
+  /**
+   * Get the name of this resource
+   *
+   * @return the instance name
+   */
+  public String getClusterName() {
+    return _record.getId();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7c4cf54..55d4734 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -167,7 +167,7 @@ public class IdealState extends HelixProperty {
 
   /**
    * Specify the strategy for Helix to use to compute the partition-instance assignment,
-   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy}
    *
    * @param rebalanceStrategy
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index eb1c652..ecf2900 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty {
   public enum InstanceConfigProperty {
     HELIX_HOST,
     HELIX_PORT,
+    HELIX_ZONE_ID,
     HELIX_ENABLED,
     HELIX_DISABLED_PARTITION,
-    TAG_LIST
+    TAG_LIST,
+    INSTANCE_WEIGHT,
+    DOMAIN
   }
+  public static final int WEIGHT_NOT_SET = -1;
 
   private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
 
@@ -94,6 +98,50 @@ public class InstanceConfig extends HelixProperty {
     _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
   }
 
+  public String getZoneId() {
+    return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name());
+  }
+
+  public void setZoneId(String zoneId) {
+    _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId);
+  }
+
+  /**
+   * Domain represents a hierarchy identifier for an instance.
+   * @return
+   */
+  public String getDomain() {
+    return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name());
+  }
+
+  /**
+   * Domain represents a hierarchy identifier for an instance.
+   * Example:  "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
+   * @return
+   */
+  public void setDomain(String domain) {
+    _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain);
+  }
+
+  public int getWeight() {
+    String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name());
+    if (w != null) {
+      try {
+        int weight = Integer.valueOf(w);
+        return weight;
+      } catch (NumberFormatException e) {
+      }
+    }
+    return WEIGHT_NOT_SET;
+  }
+
+  public void setWeight(int weight) {
+    if (weight <= 0) {
+      throw new IllegalArgumentException("Instance weight can not be equal or less than 0!");
+    }
+    _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight));
+  }
+
   /**
    * Get arbitrary tags associated with the instance
    * @return a list of tags

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 623357f..ac96768 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -32,8 +32,8 @@ import java.util.TreeSet;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
@@ -126,7 +126,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     List<String> allNodes =
         Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
     Collections.sort(allNodes);
-    ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+    ZNRecord record =
+        strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
     Map<String, List<String>> preferenceLists = record.getListFields();
 
     // Convert to an assignment keyed on participant

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 9d411bb..08ccbdc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -362,6 +362,12 @@ public class ClusterSetup {
   }
 
   public void addResourceToCluster(String clusterName, String resourceName, int numResources,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+    _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
+        rebalanceStrategy);
+  }
+
+  public void addResourceToCluster(String clusterName, String resourceName, int numResources,
       String stateModelRef, String rebalancerMode, int bucketSize) {
     _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
         bucketSize);

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
new file mode 100644
index 0000000..a4e38a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
@@ -0,0 +1,766 @@
+package org.apache.helix.controller.Strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestAutoRebalanceStrategy {
+  private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
+
+  /**
+   * Sanity test for a basic Master-Slave model
+   */
+  @Test
+  public void simpleMasterSlaveTest() {
+    final int NUM_ITERATIONS = 10;
+    final int NUM_PARTITIONS = 10;
+    final int NUM_LIVE_NODES = 12;
+    final int NUM_TOTAL_NODES = 20;
+    final int MAX_PER_NODE = 5;
+
+    final String[] STATE_NAMES = {
+        "MASTER", "SLAVE"
+    };
+    final int[] STATE_COUNTS = {
+        1, 2
+    };
+
+    runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
+        MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
+  }
+
+  /**
+   * Run a test for an arbitrary state model.
+   * @param name Name of the test state model
+   * @param numIterations Number of rebalance tasks to run
+   * @param numPartitions Number of partitions for the resource
+   * @param numLiveNodes Number of live nodes in the cluster
+   * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
+   *          numLiveNodes
+   * @param maxPerNode Maximum number of replicas a node can serve
+   * @param stateNames States ordered by preference
+   * @param stateCounts Number of replicas that should be in each state
+   */
+  private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
+      int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
+    List<String> partitions = new ArrayList<String>();
+    for (int i = 0; i < numPartitions; i++) {
+      partitions.add("p_" + i);
+    }
+
+    List<String> liveNodes = new ArrayList<String>();
+    List<String> allNodes = new ArrayList<String>();
+    for (int i = 0; i < numTotalNodes; i++) {
+      allNodes.add("n_" + i);
+      if (i < numLiveNodes) {
+        liveNodes.add("n_" + i);
+      }
+    }
+
+    Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
+
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+    for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
+      states.put(stateNames[i], stateCounts[i]);
+    }
+
+    StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
+
+    new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
+        stateModelDef).runRepeatedly(numIterations);
+  }
+
+  /**
+   * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
+   * into account when computing mappings, so this is acceptable.
+   * @param modelName name to give the model
+   * @param initialState initial state for all nodes
+   * @param states ordered map of state to count
+   * @return incomplete StateModelDefinition for rebalancing
+   */
+  private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
+      LinkedHashMap<String, Integer> states) {
+    StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
+    builder.initialState(initialState);
+    int i = states.size();
+    for (String state : states.keySet()) {
+      builder.addState(state, i);
+      builder.upperBound(state, states.get(state));
+      i--;
+    }
+    return builder.build();
+  }
+
+  class AutoRebalanceTester {
+    private static final double P_KILL = 0.45;
+    private static final double P_ADD = 0.1;
+    private static final double P_RESURRECT = 0.45;
+    private static final String RESOURCE_NAME = "resource";
+
+    private List<String> _partitions;
+    private LinkedHashMap<String, Integer> _states;
+    private List<String> _liveNodes;
+    private Set<String> _liveSet;
+    private Set<String> _removedSet;
+    private Set<String> _nonLiveSet;
+    private Map<String, Map<String, String>> _currentMapping;
+    private List<String> _allNodes;
+    private int _maxPerNode;
+    private StateModelDefinition _stateModelDef;
+    private Random _random;
+
+    public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
+        List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
+        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
+      _partitions = partitions;
+      _states = states;
+      _liveNodes = liveNodes;
+      _liveSet = new TreeSet<String>();
+      for (String node : _liveNodes) {
+        _liveSet.add(node);
+      }
+      _removedSet = new TreeSet<String>();
+      _nonLiveSet = new TreeSet<String>();
+      _currentMapping = currentMapping;
+      _allNodes = allNodes;
+      for (String node : allNodes) {
+        if (!_liveSet.contains(node)) {
+          _nonLiveSet.add(node);
+        }
+      }
+      _maxPerNode = maxPerNode;
+      _stateModelDef = stateModelDef;
+      _random = new Random();
+    }
+
+    /**
+     * Repeatedly randomly select a task to run and report the result
+     * @param numIterations
+     *          Number of random tasks to run in sequence
+     */
+    public void runRepeatedly(int numIterations) {
+      logger.info("~~~~ Initial State ~~~~~");
+      RebalanceStrategy strategy =
+          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
+      ZNRecord initialResult =
+          strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+      _currentMapping = getMapping(initialResult.getListFields());
+      logger.info(_currentMapping);
+      getRunResult(_currentMapping, initialResult.getListFields());
+      for (int i = 0; i < numIterations; i++) {
+        logger.info("~~~~ Iteration " + i + " ~~~~~");
+        ZNRecord znRecord = runOnceRandomly();
+        if (znRecord != null) {
+          final Map<String, List<String>> listResult = znRecord.getListFields();
+          final Map<String, Map<String, String>> mapResult = getMapping(listResult);
+          logger.info(mapResult);
+          logger.info(listResult);
+          getRunResult(mapResult, listResult);
+          _currentMapping = mapResult;
+        }
+      }
+    }
+
+    private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
+      final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
+      ClusterDataCache cache = new ClusterDataCache();
+      MockAccessor accessor = new MockAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      for (String node : _liveNodes) {
+        LiveInstance liveInstance = new LiveInstance(node);
+        liveInstance.setSessionId("testSession");
+        accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
+      }
+      cache.refresh(accessor);
+      for (String partition : _partitions) {
+        List<String> preferenceList = listResult.get(partition);
+        Map<String, String> currentStateMap = _currentMapping.get(partition);
+        Set<String> disabled = Collections.emptySet();
+        Map<String, String> assignment =
+            ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
+                preferenceList, currentStateMap, disabled, true);
+        mapResult.put(partition, assignment);
+      }
+      return mapResult;
+    }
+
+    /**
+     * Output various statistics and correctness check results
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void getRunResult(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      logger.info("***** Statistics *****");
+      dumpStatistics(mapFields);
+      verifyCorrectness(mapFields, listFields);
+    }
+
+    /**
+     * Output statistics about the assignment
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     */
+    public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
+      Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      int nodeCount = _liveNodes.size();
+      logger.info("Total number of nodes: " + nodeCount);
+      logger.info("Nodes: " + _liveNodes);
+      int sumPartitions = getSum(partitionsPerNode.values());
+      logger.info("Total number of partitions: " + sumPartitions);
+      double averagePartitions = getAverage(partitionsPerNode.values());
+      logger.info("Average number of partitions per node: " + averagePartitions);
+      double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
+      logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+      // Statistics about each state
+      Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+      for (String state : _states.keySet()) {
+        Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+        for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
+          Map<String, Integer> stateCounts = nodeStates.getValue();
+          if (stateCounts.containsKey(state)) {
+            nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+          } else {
+            nodeStateCounts.put(nodeStates.getKey(), 0);
+          }
+        }
+        int sumStates = getSum(nodeStateCounts.values());
+        logger.info("Total number of state " + state + ": " + sumStates);
+        double averageStates = getAverage(nodeStateCounts.values());
+        logger.info("Average number of state " + state + " per node: " + averageStates);
+        double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+        logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
+      }
+    }
+
+    /**
+     * Run a set of correctness tests, reporting success or failure
+     * @param mapFields
+     *          The map-map assignment generated by the rebalancer
+     * @param listFields
+     *          The map-list assignment generated by the rebalancer
+     */
+    public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
+        final Map<String, List<String>> listFields) {
+      final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+      boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+      assert maxConstraintMet : "Max per node constraint: FAIL";
+      logger.info("Max per node constraint: PASS");
+
+      boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+      assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+      logger.info("Only live nodes have partitions constraint: PASS");
+
+      boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+      assert stateAssignmentPossible : "State replica constraint: FAIL";
+      logger.info("State replica constraint: PASS");
+
+      boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+      assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+      logger.info("Node uniqueness per partition constraint: PASS");
+    }
+
+    private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+      for (String node : partitionsPerNode.keySet()) {
+        Integer value = partitionsPerNode.get(node);
+        if (value > _maxPerNode) {
+          logger.error("ERROR: Node " + node + " has " + value
+              + " partitions despite a maximum of " + _maxPerNode);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
+      for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
+        boolean isLive = _liveSet.contains(nodeState.getKey());
+        boolean isEmpty = nodeState.getValue() == 0;
+        if (!isLive && !isEmpty) {
+          logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
+              + nodeState.getValue() + " replicas!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
+      for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+        for (String state : nodeMap.values()) {
+          if (!stateCounts.containsKey(state)) {
+            stateCounts.put(state, 1);
+          } else {
+            stateCounts.put(state, stateCounts.get(state) + 1);
+          }
+        }
+        for (String state : stateCounts.keySet()) {
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          int count = stateCounts.get(state);
+          int maximumCount = _states.get(state);
+          if (count > maximumCount) {
+            logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
+                + " has " + count + " replicas when " + maximumCount + " is allowed!");
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
+      for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
+        Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+        int numUniques = nodeSet.size();
+        int total = partitionEntry.getValue().size();
+        if (numUniques < total) {
+          logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
+              + " nodes, but only " + numUniques + " are unique!");
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private double getAverage(final Collection<Integer> values) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      if (values.size() != 0) {
+        return sum / values.size();
+      } else {
+        return -1.0;
+      }
+    }
+
+    private int getSum(final Collection<Integer> values) {
+      int sum = 0;
+      for (Integer value : values) {
+        sum += value;
+      }
+      return sum;
+    }
+
+    private double getStdev(final Collection<Integer> values, double mean) {
+      double sum = 0.0;
+      for (Integer value : values) {
+        double deviation = mean - value;
+        sum += Math.pow(deviation, 2.0);
+      }
+      if (values.size() != 0) {
+        sum /= values.size();
+        return Math.pow(sum, 0.5);
+      } else {
+        return -1.0;
+      }
+    }
+
+    private Map<String, Integer> getPartitionBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+      for (String node : _liveNodes) {
+        partitionsPerNode.put(node, 0);
+      }
+      for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+        final Map<String, String> nodeMap = partitionEntry.getValue();
+        for (String node : nodeMap.keySet()) {
+          String state = nodeMap.get(node);
+          if (state.equals(HelixDefinedState.DROPPED.toString())) {
+            continue;
+          }
+          // add 1 for every occurrence of a node
+          if (!partitionsPerNode.containsKey(node)) {
+            partitionsPerNode.put(node, 1);
+          } else {
+            partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+          }
+        }
+      }
+      return partitionsPerNode;
+    }
+
+    private Map<String, Map<String, Integer>> getStateBucketsForNode(
+        final Map<String, Map<String, String>> assignment) {
+      Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
+      for (String n : _liveNodes) {
+        result.put(n, new TreeMap<String, Integer>());
+      }
+      for (Map<String, String> nodeStateMap : assignment.values()) {
+        for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+          if (!result.containsKey(nodeState.getKey())) {
+            result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+          }
+          Map<String, Integer> stateMap = result.get(nodeState.getKey());
+          if (!stateMap.containsKey(nodeState.getValue())) {
+            stateMap.put(nodeState.getValue(), 1);
+          } else {
+            stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Randomly choose between killing, adding, or resurrecting a single node
+     * @return (Partition -> (Node -> State)) ZNRecord
+     */
+    public ZNRecord runOnceRandomly() {
+      double choose = _random.nextDouble();
+      ZNRecord result = null;
+      if (choose < P_KILL) {
+        result = removeSingleNode(null);
+      } else if (choose < P_KILL + P_ADD) {
+        result = addSingleNode(null);
+      } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+        result = resurrectSingleNode(null);
+      }
+      return result;
+    }
+
+    /**
+     * Run rebalancer trying to add a never-live node
+     * @param node
+     *          Optional String to add
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord addSingleNode(String node) {
+      logger.info("=================== add node =================");
+      if (_nonLiveSet.size() == 0) {
+        logger.warn("Cannot add node because there are no nodes left to add.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_nonLiveSet.contains(node)) {
+        node = getRandomSetElement(_nonLiveSet);
+      }
+      logger.info("Adding " + node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+      _nonLiveSet.remove(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+          computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    /**
+     * Run rebalancer trying to remove a live node
+     * @param node
+     *          Optional String to remove
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord removeSingleNode(String node) {
+      logger.info("=================== remove node =================");
+      if (_liveSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to remove.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_liveSet.contains(node)) {
+        node = getRandomSetElement(_liveSet);
+      }
+      logger.info("Removing " + node);
+      _removedSet.add(node);
+      _liveNodes.remove(node);
+      _liveSet.remove(node);
+
+      // the rebalancer expects that the current mapping doesn't contain deleted
+      // nodes
+      for (Map<String, String> nodeMap : _currentMapping.values()) {
+        if (nodeMap.containsKey(node)) {
+          nodeMap.remove(node);
+        }
+      }
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    /**
+     * Run rebalancer trying to add back a removed node
+     * @param node
+     *          Optional String to resurrect
+     * @return ZNRecord result returned by the rebalancer
+     */
+    public ZNRecord resurrectSingleNode(String node) {
+      logger.info("=================== resurrect node =================");
+      if (_removedSet.size() == 0) {
+        logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+        return null;
+      }
+
+      // Get a random never-live node
+      if (node == null || !_removedSet.contains(node)) {
+        node = getRandomSetElement(_removedSet);
+      }
+      logger.info("Resurrecting " + node);
+      _removedSet.remove(node);
+      _liveNodes.add(node);
+      _liveSet.add(node);
+
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+    }
+
+    private <T> T getRandomSetElement(Set<T> source) {
+      int element = _random.nextInt(source.size());
+      int i = 0;
+      for (T node : source) {
+        if (i == element) {
+          return node;
+        }
+        i++;
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
+   * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
+   * current mapping, then it should distribute states as evenly as possible.
+   */
+  @Test
+  public void testOrphansNotPreferred() {
+    final String RESOURCE_NAME = "resource";
+    final String[] PARTITIONS = {
+        "resource_0", "resource_1", "resource_2"
+    };
+    final StateModelDefinition STATE_MODEL =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    final int REPLICA_COUNT = 2;
+    final String[] NODES = {
+        "n0", "n1", "n2"
+    };
+
+    // initial state, one node, no mapping
+    List<String> allNodes = Lists.newArrayList(NODES[0]);
+    List<String> liveNodes = Lists.newArrayList(NODES[0]);
+    Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+    for (String partition : PARTITIONS) {
+      currentMapping.put(partition, new HashMap<String, String>());
+    }
+
+    // make sure that when the first node joins, a single replica is assigned fairly
+    List<String> partitions = ImmutableList.copyOf(PARTITIONS);
+    LinkedHashMap<String, Integer> stateCount =
+        AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    ZNRecord znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    Map<String, List<String>> preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      // make sure these are all MASTER
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+    }
+
+    // now assign a replica to the first node in the current mapping, and add a second node
+    allNodes.add(NODES[1]);
+    liveNodes.add(NODES[1]);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).put(NODES[0], "MASTER");
+    }
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
+          + partition);
+      Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
+          + partition);
+    }
+
+    // now set the current mapping to reflect this update and make sure that it distributes masters
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).put(NODES[1], "SLAVE");
+    }
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    Set<String> firstNodes = Sets.newHashSet();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+    }
+    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+
+    // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
+    // new node is never the most preferred
+    allNodes.add(NODES[2]);
+    liveNodes.add(NODES[2]);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+    // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
+    currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    boolean newNodeUsed = false;
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      if (preferenceList.contains(NODES[2])) {
+        newNodeUsed = true;
+        Assert.assertEquals(preferenceList.get(1), NODES[2],
+            "newly added node not at preference list tail for " + partition);
+      }
+    }
+    Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
+
+    // now remap this to take the new node into account, should go back to balancing masters, slaves
+    // evenly across all nodes
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    firstNodes.clear();
+    Set<String> secondNodes = Sets.newHashSet();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+      secondNodes.add(preferenceList.get(1));
+    }
+    Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
+    Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
+
+    // remove a node now, but use the current mapping with everything balanced just prior
+    liveNodes.remove(0);
+    stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+    // remove all references of n0 from the mapping, keep everything else in a legal state
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      Map<String, String> stateMap = currentMapping.get(partition);
+      for (String participant : stateMap.keySet()) {
+        Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
+            + partition);
+      }
+      for (String participant : preferenceList) {
+        if (!stateMap.containsKey(participant)) {
+          Assert.assertNotSame(preferenceList.get(0), participant,
+              "newly moved replica should not be master for " + partition);
+        }
+      }
+    }
+
+    // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
+    for (String partition : PARTITIONS) {
+      currentMapping.get(partition).clear();
+    }
+    currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+    currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
+    currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
+    currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+    znRecord =
+        new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+    preferenceLists = znRecord.getListFields();
+    firstNodes.clear();
+    for (String partition : currentMapping.keySet()) {
+      List<String> preferenceList = preferenceLists.get(partition);
+      Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+      Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+      firstNodes.add(preferenceList.get(0));
+    }
+    Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+  }
+}


[5/5] helix git commit: [HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode

Posted by lx...@apache.org.
[HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bc0aa76a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bc0aa76a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bc0aa76a

Branch: refs/heads/helix-0.6.x
Commit: bc0aa76a9de6243928e53e1a1d01e7502ff8267c
Parents: f5ac8f8
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 31 19:17:39 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   |   3 +
 .../util/ConstraintBasedAssignment.java         |  22 +--
 .../controller/stages/ClusterDataCache.java     |  16 +++
 .../TestAutoRebalanceWithDisabledInstance.java  | 142 +++++++++++++++++++
 .../integration/TestStateTransitionTimeout.java |  28 ----
 .../integration/ZkStandAloneCMTestBase.java     |   2 +
 .../mock/participant/MockMSStateModel.java      |  65 +--------
 7 files changed, 180 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index a8d83a2..e47297f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,6 +82,9 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
     stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
     List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+    allNodes.removeAll(clusterData.getDisabledInstances());
+    liveNodes.retainAll(allNodes);
+
     Map<String, Map<String, String>> currentMapping =
         currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a520803..9366bcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -75,24 +75,26 @@ public class ConstraintBasedAssignment {
       boolean isResourceEnabled) {
     Map<String, String> instanceStateMap = new HashMap<String, String>();
 
-    // if the ideal state is deleted, instancePreferenceList will be empty and
-    // we should drop all resources.
     if (currentStateMap != null) {
       for (String instance : currentStateMap.keySet()) {
-        if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
-            && !disabledInstancesForPartition.contains(instance)) {
-          // if dropped (whether disabled or not), transit to DROPPED
+        if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) {
+          // The partition is dropped from preference list.
+          // Transit to DROPPED no matter the instance is disabled or not.
           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
-        } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
-            HelixDefinedState.ERROR.name()))
-            && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
+        } else {
           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
-          instanceStateMap.put(instance, stateModelDef.getInitialState());
+          if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) {
+            if (currentStateMap.get(instance) == null || !currentStateMap.get(instance)
+                .equals(HelixDefinedState.ERROR.name())) {
+              instanceStateMap.put(instance, stateModelDef.getInitialState());
+            }
+          }
         }
       }
     }
 
-    // ideal state is deleted
+    // if the ideal state is deleted, instancePreferenceList will be empty and
+    // we should drop all resources.
     if (instancePreferenceList == null) {
       return instanceStateMap;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index b77ce0d..cb5bda8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -390,6 +390,22 @@ public class ClusterDataCache {
     return disabledInstancesSet;
   }
 
+
+  /**
+   * This method allows one to fetch the set of nodes that are disabled
+   * @return
+   */
+  public Set<String> getDisabledInstances() {
+    Set<String> disabledInstancesSet = new HashSet<String>();
+    for (String instance : _instanceConfigMap.keySet()) {
+      InstanceConfig config = _instanceConfigMap.get(instance);
+      if (config.getInstanceEnabled() == false) {
+        disabledInstancesSet.add(instance);
+      }
+    }
+    return disabledInstancesSet;
+  }
+
   /**
    * Returns the number of replicas for a given resource.
    * @param resourceName

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..84eca6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
@@ -0,0 +1,142 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase {
+  private static String TEST_DB_2 = "TestDB2";
+
+  @BeforeClass
+  @Override
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL,
+        RebalanceMode.FULL_AUTO + "");
+    _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica);
+
+    Thread.sleep(200);
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+            CLUSTER_NAME));
+    Assert.assertTrue(result);
+  }
+
+  @Test()
+  public void testDisableEnableInstanceAutoRebalance() throws Exception {
+    String disabledInstance = _participants[0].getInstanceName();
+
+    Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2,
+        disabledInstance);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+        disabledInstance);
+    Assert.assertFalse(currentPartitions.isEmpty());
+
+    // disable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertTrue(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertTrue(currentPartitions.isEmpty());
+
+    //enable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+    Assert.assertFalse(currentPartitions.isEmpty());
+  }
+
+  @Test()
+  public void testAddDisabledInstanceAutoRebalance() throws Exception {
+    // add disabled instance.
+    String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR);
+    _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName);
+    MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName);
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false);
+
+    participant.syncStart();
+
+    Thread.sleep(400);
+    Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertTrue(assignedPartitions.isEmpty());
+    Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+        nodeName);
+    Assert.assertTrue(currentPartitions.isEmpty());
+
+    //enable instance
+    _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true);
+    Thread.sleep(400);
+    assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertFalse(assignedPartitions.isEmpty());
+    currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+    Assert.assertFalse(currentPartitions.isEmpty());
+  }
+
+  private Set<String> getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) {
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    Set<String> partitionSet = new HashSet<String>();
+    IdealState is = admin.getResourceIdealState(cluster, dbName);
+    for (String partition : is.getRecord().getListFields().keySet()) {
+      List<String> assignments = is.getRecord().getListField(partition);
+      for (String ins : assignments) {
+        if (ins.equals(instance)) {
+          partitionSet.add(partition);
+        }
+      }
+    }
+
+    return partitionSet;
+  }
+
+  private Set<String> getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) {
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    Set<String> partitionSet = new HashSet<String>();
+
+    ExternalView ev = admin.getResourceExternalView(cluster, dbName);
+    for (String partition : ev.getRecord().getMapFields().keySet()) {
+      Map<String, String> assignments = ev.getRecord().getMapField(partition);
+      for (String ins : assignments.keySet()) {
+        if (ins.equals(instance)) {
+          partitionSet.add(partition);
+        }
+      }
+    }
+    return partitionSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index 443d484..fb534fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -99,14 +99,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
       _sleep = sleep;
     }
 
-    @Override
-    @Transition(to = "SLAVE", from = "OFFLINE")
-    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
-      LOG.info("Become SLAVE from OFFLINE");
-
-    }
-
-    @Override
     @Transition(to = "MASTER", from = "SLAVE")
     public void onBecomeMasterFromSlave(Message message, NotificationContext context)
         throws InterruptedException {
@@ -117,26 +109,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
     }
 
     @Override
-    @Transition(to = "SLAVE", from = "MASTER")
-    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
-      LOG.info("Become SLAVE from MASTER");
-    }
-
-    @Override
-    @Transition(to = "OFFLINE", from = "SLAVE")
-    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
-      LOG.info("Become OFFLINE from SLAVE");
-
-    }
-
-    @Override
-    @Transition(to = "DROPPED", from = "OFFLINE")
-    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-      LOG.info("Become DROPPED from OFFLINE");
-
-    }
-
-    @Override
     public void rollbackOnError(Message message, NotificationContext context,
         StateTransitionError error) {
       _error = error;

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 5d169d5..f694618 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -91,6 +91,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
         ClusterStateVerifier
             .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
 
+    Assert.assertTrue(result);
+
     result =
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             CLUSTER_NAME));

http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
index 61733ba..7d90063 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
@@ -43,67 +43,12 @@ public class MockMSStateModel extends StateModel {
     _transition = transition;
   }
 
-  // overwrite default error->dropped transition
-  @Transition(to = "DROPPED", from = "ERROR")
-  public void onBecomeDroppedFromError(Message message, NotificationContext context)
+  @Transition(to = "*", from = "*")
+  public void generalTransitionHandle(Message message, NotificationContext context)
       throws InterruptedException {
-    LOG.info("Become DROPPED from ERROR");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "SLAVE", from = "OFFLINE")
-  public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become SLAVE from OFFLINE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-
-    }
-  }
-
-  @Transition(to = "MASTER", from = "SLAVE")
-  public void onBecomeMasterFromSlave(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become MASTER from SLAVE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "SLAVE", from = "MASTER")
-  public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become SLAVE from MASTER");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "OFFLINE", from = "SLAVE")
-  public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become OFFLINE from SLAVE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "DROPPED", from = "OFFLINE")
-  public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become DROPPED from OFFLINE");
-    if (_transition != null) {
-      _transition.doTransition(message, context);
-    }
-  }
-
-  @Transition(to = "OFFLINE", from = "ERROR")
-  public void onBecomeOfflineFromError(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.info("Become OFFLINE from ERROR");
-    // System.err.println("Become OFFLINE from ERROR");
+    LOG.info(String
+        .format("Resource %s partition %s becomes %s from %s", message.getResourceName(),
+            message.getPartitionName(), message.getToState(), message.getFromState()));
     if (_transition != null) {
       _transition.doTransition(message, context);
     }


[4/5] helix git commit: [HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.

Posted by lx...@apache.org.
[HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ea0fbbbc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ea0fbbbc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ea0fbbbc

Branch: refs/heads/helix-0.6.x
Commit: ea0fbbbce302974b88a2b8253bf06616fd91aa5b
Parents: bc0aa76
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Jun 7 14:42:43 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../controller/rebalancer/AutoRebalancer.java   | 40 +++++++---
 .../strategy/AutoRebalanceStrategy.java         | 40 +++++-----
 .../controller/strategy/RebalanceStrategy.java  | 52 +++++++++++++
 .../java/org/apache/helix/model/IdealState.java | 23 +++++-
 .../helix/model/builder/IdealStateBuilder.java  | 80 ++++++++++++++++++++
 .../task/GenericTaskAssignmentCalculator.java   |  5 +-
 .../strategy/TestAutoRebalanceStrategy.java     | 25 +++---
 7 files changed, 217 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index e47297f..6682426 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -35,8 +36,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
@@ -44,6 +44,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -59,14 +60,14 @@ import org.apache.log4j.Logger;
 public class AutoRebalancer implements Rebalancer, MappingCalculator {
   // These should be final, but are initialized in init rather than a constructor
   private HelixManager _manager;
-  private AutoRebalanceStrategy _algorithm;
+  private RebalanceStrategy _rebalanceStrategy;
 
   private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
 
   @Override
   public void init(HelixManager manager) {
     this._manager = manager;
-    this._algorithm = null;
+    this._rebalanceStrategy = null;
   }
 
   @Override
@@ -127,13 +128,32 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
-    ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
-    placementScheme.init(_manager);
-    _algorithm =
-        new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
-            placementScheme);
+    String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
+    if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+      _rebalanceStrategy =
+          new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+    } else {
+      try {
+        _rebalanceStrategy = RebalanceStrategy.class
+            .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+        _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+      } catch (ClassNotFoundException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (InstantiationException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      } catch (IllegalAccessException ex) {
+        throw new HelixException(
+            "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+            ex);
+      }
+    }
+
     ZNRecord newMapping =
-        _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+        _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 11b5b0d..959609f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -36,16 +36,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.log4j.Logger;
 
-public class AutoRebalanceStrategy {
-
+public class AutoRebalanceStrategy implements RebalanceStrategy {
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-
-  private final String _resourceName;
-  private final List<String> _partitions;
-  private final LinkedHashMap<String, Integer> _states;
-  private final int _maximumPerNode;
   private final ReplicaPlacementScheme _placementScheme;
 
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+  private int _maximumPerNode;
+
   private Map<String, Node> _nodeMap;
   private List<Node> _liveNodesList;
   private Map<Integer, String> _stateMap;
@@ -56,24 +55,26 @@ public class AutoRebalanceStrategy {
   private Set<Replica> _orphaned;
 
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode,
-      ReplicaPlacementScheme placementScheme) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
-    _maximumPerNode = maximumPerNode;
-    if (placementScheme != null) {
-      _placementScheme = placementScheme;
-    } else {
-      _placementScheme = new DefaultPlacementScheme();
-    }
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    init(resourceName, partitions, states, maximumPerNode);
+    _placementScheme = new DefaultPlacementScheme();
   }
 
   public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
       final LinkedHashMap<String, Integer> states) {
-    this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
+    this(resourceName, partitions, states, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    _maximumPerNode = maximumPerNode;
   }
 
+  @Override
   public ZNRecord computePartitionAssignment(final List<String> liveNodes,
       final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
     int numReplicas = countStateReplicas();
@@ -546,7 +547,6 @@ public class AutoRebalanceStrategy {
 
   /**
    * Counts the total number of replicas given a state-count mapping
-   * @param states
    * @return
    */
   private int countStateReplicas() {

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..4daae82
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
@@ -0,0 +1,52 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+  /**
+   * Perform the necessary initialization for the rebalance strategy object.
+   * @param resourceName
+   * @param partitions
+   * @param states
+   * @param maximumPerNode
+   */
+  void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for the given resource.
+   *
+   * @param liveNodes
+   * @param currentMapping
+   * @param allNodes
+   * @return
+   */
+  ZNRecord computePartitionAssignment(final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 44f4219..7c4cf54 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -53,10 +53,11 @@ public class IdealState extends HelixProperty {
     @Deprecated
     IDEAL_STATE_MODE,
     REBALANCE_MODE,
+    REBALANCER_CLASS_NAME,
     REBALANCE_TIMER_PERIOD,
+    REBALANCE_STRATEGY,
     MAX_PARTITIONS_PER_INSTANCE,
     INSTANCE_GROUP_TAG,
-    REBALANCER_CLASS_NAME,
     HELIX_ENABLED,
     RESOURCE_GROUP_NAME,
     GROUP_ROUTING_ENABLED,
@@ -165,6 +166,26 @@ public class IdealState extends HelixProperty {
   }
 
   /**
+   * Specify the strategy for Helix to use to compute the partition-instance assignment,
+   * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+   *
+   * @param rebalanceStrategy
+   * @return
+   */
+  public void setRebalanceStrategy(String rebalanceStrategy) {
+    _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy);
+  }
+
+  /**
+   * Get the rebalance strategy for this resource.
+   *
+   * @return rebalance strategy, or null if not specified.
+   */
+  public String getRebalanceStrategy() {
+    return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name());
+  }
+
+  /**
    * Set the resource group name
    * @param resourceGroupName
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index d3bc3f2..9ad3023 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -52,6 +52,17 @@ public abstract class IdealStateBuilder {
    * Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED
    */
   protected IdealState.RebalanceMode rebalancerMode;
+
+  /**
+   * Customized rebalancer class.
+   */
+  private String rebalancerClassName;
+
+  /**
+   * Custom rebalance strategy
+   */
+  private String rebalanceStrategy;
+
   /**
    * A constraint that limits the maximum number of partitions per Node.
    */
@@ -68,6 +79,16 @@ public abstract class IdealStateBuilder {
    */
   private Boolean disableExternalView = null;
 
+  /**
+   * Resource group name.
+   */
+  private String resourceGroupName;
+
+  /**
+   * Whether the resource group routing should be enabled in routingProvider.
+   */
+  private Boolean enableGroupRouting;
+
   protected ZNRecord _record;
 
   /**
@@ -144,6 +165,44 @@ public abstract class IdealStateBuilder {
   }
 
   /**
+   * Set custom rebalancer class name.
+   * @return IdealStateBuilder
+   */
+  public IdealStateBuilder setRebalancerClass(String rebalancerClassName) {
+    this.rebalancerClassName = rebalancerClassName;
+    return this;
+  }
+
+  /**
+   * Set custom rebalance strategy name.
+   * @param rebalanceStrategy
+   * @return
+   */
+  public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) {
+    this.rebalanceStrategy = rebalanceStrategy;
+    return this;
+  }
+
+  /**
+   *
+   * @param resourceGroupName
+   * @return
+   */
+  public IdealStateBuilder setResourceGroupName(String resourceGroupName) {
+    this.resourceGroupName = resourceGroupName;
+    return this;
+  }
+
+  /**
+   * Enable Group Routing for this resource.
+   * @return
+   */
+  public IdealStateBuilder enableGroupRouting() {
+    this.enableGroupRouting = true;
+    return this;
+  }
+
+  /**
    * @return
    */
   public IdealState build() {
@@ -154,10 +213,31 @@ public abstract class IdealStateBuilder {
     idealstate.setStateModelFactoryName(stateModelFactoryName);
     idealstate.setRebalanceMode(rebalancerMode);
     idealstate.setReplicas("" + numReplica);
+
+    if (rebalancerClassName != null) {
+      idealstate.setRebalancerClassName(rebalancerClassName);
+    }
+
+    if (rebalanceStrategy != null) {
+      idealstate.setRebalanceStrategy(rebalanceStrategy);
+    }
+
+    if (maxPartitionsPerNode > 0) {
+      idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+    }
+
     if (disableExternalView != null) {
       idealstate.setDisableExternalView(disableExternalView);
     }
 
+    if (resourceGroupName != null) {
+      idealstate.setResourceGroupName(resourceGroupName);
+    }
+
+    if (enableGroupRouting != null) {
+      idealstate.enableGroupRouting(enableGroupRouting);
+    }
+
     if (!idealstate.isValid()) {
       throw new HelixException("invalid ideal-state: " + idealstate);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index b0a1a33..623357f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -33,6 +33,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
@@ -121,9 +122,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
     }
 
     // Get the assignment keyed on partition
-    AutoRebalanceStrategy strategy =
-        new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
-            new AutoRebalanceStrategy.DefaultPlacementScheme());
+    RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
     List<String> allNodes =
         Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
     Collections.sort(allNodes);

http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 985d0c8..adc92d6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -116,8 +116,7 @@ public class TestAutoRebalanceStrategy {
     StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
 
     new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
-        stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme())
-        .runRepeatedly(numIterations);
+        stateModelDef).runRepeatedly(numIterations);
   }
 
   /**
@@ -157,13 +156,11 @@ public class TestAutoRebalanceStrategy {
     private List<String> _allNodes;
     private int _maxPerNode;
     private StateModelDefinition _stateModelDef;
-    private ReplicaPlacementScheme _placementScheme;
     private Random _random;
 
     public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
         List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
-        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef,
-        ReplicaPlacementScheme placementScheme) {
+        List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
       _partitions = partitions;
       _states = states;
       _liveNodes = liveNodes;
@@ -182,7 +179,6 @@ public class TestAutoRebalanceStrategy {
       }
       _maxPerNode = maxPerNode;
       _stateModelDef = stateModelDef;
-      _placementScheme = placementScheme;
       _random = new Random();
     }
 
@@ -193,9 +189,10 @@ public class TestAutoRebalanceStrategy {
      */
     public void runRepeatedly(int numIterations) {
       logger.info("~~~~ Initial State ~~~~~");
+      RebalanceStrategy strategy =
+          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
       ZNRecord initialResult =
-          new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-              _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+          strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
       _currentMapping = getMapping(initialResult.getListFields());
       logger.info(_currentMapping);
       getRunResult(_currentMapping, initialResult.getListFields());
@@ -500,8 +497,8 @@ public class TestAutoRebalanceStrategy {
       _liveSet.add(node);
       _nonLiveSet.remove(node);
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+          computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     /**
@@ -534,8 +531,8 @@ public class TestAutoRebalanceStrategy {
         }
       }
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     /**
@@ -560,8 +557,8 @@ public class TestAutoRebalanceStrategy {
       _liveNodes.add(node);
       _liveSet.add(node);
 
-      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
-          _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+      return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+          .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
     }
 
     private <T> T getRandomSetElement(Set<T> source) {