You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/12 17:09:40 UTC
[1/5] helix git commit: [HELIX-568] Add new topology aware
(rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is
available at:
https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x f5ac8f8b9 -> 7147ec874
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
deleted file mode 100644
index adc92d6..0000000
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ /dev/null
@@ -1,765 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.Mocks.MockAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.AutoRebalancer;
-import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.log4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class TestAutoRebalanceStrategy {
- private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
-
- /**
- * Sanity test for a basic Master-Slave model
- */
- @Test
- public void simpleMasterSlaveTest() {
- final int NUM_ITERATIONS = 10;
- final int NUM_PARTITIONS = 10;
- final int NUM_LIVE_NODES = 12;
- final int NUM_TOTAL_NODES = 20;
- final int MAX_PER_NODE = 5;
-
- final String[] STATE_NAMES = {
- "MASTER", "SLAVE"
- };
- final int[] STATE_COUNTS = {
- 1, 2
- };
-
- runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
- MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
- }
-
- /**
- * Run a test for an arbitrary state model.
- * @param name Name of the test state model
- * @param numIterations Number of rebalance tasks to run
- * @param numPartitions Number of partitions for the resource
- * @param numLiveNodes Number of live nodes in the cluster
- * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
- * numLiveNodes
- * @param maxPerNode Maximum number of replicas a node can serve
- * @param stateNames States ordered by preference
- * @param stateCounts Number of replicas that should be in each state
- */
- private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
- int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
- List<String> partitions = new ArrayList<String>();
- for (int i = 0; i < numPartitions; i++) {
- partitions.add("p_" + i);
- }
-
- List<String> liveNodes = new ArrayList<String>();
- List<String> allNodes = new ArrayList<String>();
- for (int i = 0; i < numTotalNodes; i++) {
- allNodes.add("n_" + i);
- if (i < numLiveNodes) {
- liveNodes.add("n_" + i);
- }
- }
-
- Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
-
- LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
- for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
- states.put(stateNames[i], stateCounts[i]);
- }
-
- StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
-
- new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
- stateModelDef).runRepeatedly(numIterations);
- }
-
- /**
- * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
- * into account when computing mappings, so this is acceptable.
- * @param modelName name to give the model
- * @param initialState initial state for all nodes
- * @param states ordered map of state to count
- * @return incomplete StateModelDefinition for rebalancing
- */
- private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
- LinkedHashMap<String, Integer> states) {
- StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
- builder.initialState(initialState);
- int i = states.size();
- for (String state : states.keySet()) {
- builder.addState(state, i);
- builder.upperBound(state, states.get(state));
- i--;
- }
- return builder.build();
- }
-
- class AutoRebalanceTester {
- private static final double P_KILL = 0.45;
- private static final double P_ADD = 0.1;
- private static final double P_RESURRECT = 0.45;
- private static final String RESOURCE_NAME = "resource";
-
- private List<String> _partitions;
- private LinkedHashMap<String, Integer> _states;
- private List<String> _liveNodes;
- private Set<String> _liveSet;
- private Set<String> _removedSet;
- private Set<String> _nonLiveSet;
- private Map<String, Map<String, String>> _currentMapping;
- private List<String> _allNodes;
- private int _maxPerNode;
- private StateModelDefinition _stateModelDef;
- private Random _random;
-
- public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
- List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
- List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
- _partitions = partitions;
- _states = states;
- _liveNodes = liveNodes;
- _liveSet = new TreeSet<String>();
- for (String node : _liveNodes) {
- _liveSet.add(node);
- }
- _removedSet = new TreeSet<String>();
- _nonLiveSet = new TreeSet<String>();
- _currentMapping = currentMapping;
- _allNodes = allNodes;
- for (String node : allNodes) {
- if (!_liveSet.contains(node)) {
- _nonLiveSet.add(node);
- }
- }
- _maxPerNode = maxPerNode;
- _stateModelDef = stateModelDef;
- _random = new Random();
- }
-
- /**
- * Repeatedly randomly select a task to run and report the result
- * @param numIterations
- * Number of random tasks to run in sequence
- */
- public void runRepeatedly(int numIterations) {
- logger.info("~~~~ Initial State ~~~~~");
- RebalanceStrategy strategy =
- new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
- ZNRecord initialResult =
- strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
- _currentMapping = getMapping(initialResult.getListFields());
- logger.info(_currentMapping);
- getRunResult(_currentMapping, initialResult.getListFields());
- for (int i = 0; i < numIterations; i++) {
- logger.info("~~~~ Iteration " + i + " ~~~~~");
- ZNRecord znRecord = runOnceRandomly();
- if (znRecord != null) {
- final Map<String, List<String>> listResult = znRecord.getListFields();
- final Map<String, Map<String, String>> mapResult = getMapping(listResult);
- logger.info(mapResult);
- logger.info(listResult);
- getRunResult(mapResult, listResult);
- _currentMapping = mapResult;
- }
- }
- }
-
- private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
- final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
- ClusterDataCache cache = new ClusterDataCache();
- MockAccessor accessor = new MockAccessor();
- Builder keyBuilder = accessor.keyBuilder();
- for (String node : _liveNodes) {
- LiveInstance liveInstance = new LiveInstance(node);
- liveInstance.setSessionId("testSession");
- accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
- }
- cache.refresh(accessor);
- for (String partition : _partitions) {
- List<String> preferenceList = listResult.get(partition);
- Map<String, String> currentStateMap = _currentMapping.get(partition);
- Set<String> disabled = Collections.emptySet();
- Map<String, String> assignment =
- ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
- preferenceList, currentStateMap, disabled, true);
- mapResult.put(partition, assignment);
- }
- return mapResult;
- }
-
- /**
- * Output various statistics and correctness check results
- * @param mapFields
- * The map-map assignment generated by the rebalancer
- * @param listFields
- * The map-list assignment generated by the rebalancer
- */
- public void getRunResult(final Map<String, Map<String, String>> mapFields,
- final Map<String, List<String>> listFields) {
- logger.info("***** Statistics *****");
- dumpStatistics(mapFields);
- verifyCorrectness(mapFields, listFields);
- }
-
- /**
- * Output statistics about the assignment
- * @param mapFields
- * The map-map assignment generated by the rebalancer
- */
- public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
- Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
- int nodeCount = _liveNodes.size();
- logger.info("Total number of nodes: " + nodeCount);
- logger.info("Nodes: " + _liveNodes);
- int sumPartitions = getSum(partitionsPerNode.values());
- logger.info("Total number of partitions: " + sumPartitions);
- double averagePartitions = getAverage(partitionsPerNode.values());
- logger.info("Average number of partitions per node: " + averagePartitions);
- double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
- logger.info("Standard deviation of partitions: " + stdevPartitions);
-
- // Statistics about each state
- Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
- for (String state : _states.keySet()) {
- Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
- for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
- Map<String, Integer> stateCounts = nodeStates.getValue();
- if (stateCounts.containsKey(state)) {
- nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
- } else {
- nodeStateCounts.put(nodeStates.getKey(), 0);
- }
- }
- int sumStates = getSum(nodeStateCounts.values());
- logger.info("Total number of state " + state + ": " + sumStates);
- double averageStates = getAverage(nodeStateCounts.values());
- logger.info("Average number of state " + state + " per node: " + averageStates);
- double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
- logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
- }
- }
-
- /**
- * Run a set of correctness tests, reporting success or failure
- * @param mapFields
- * The map-map assignment generated by the rebalancer
- * @param listFields
- * The map-list assignment generated by the rebalancer
- */
- public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
- final Map<String, List<String>> listFields) {
- final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
- boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
- assert maxConstraintMet : "Max per node constraint: FAIL";
- logger.info("Max per node constraint: PASS");
-
- boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
- assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
- logger.info("Only live nodes have partitions constraint: PASS");
-
- boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
- assert stateAssignmentPossible : "State replica constraint: FAIL";
- logger.info("State replica constraint: PASS");
-
- boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
- assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
- logger.info("Node uniqueness per partition constraint: PASS");
- }
-
- private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
- for (String node : partitionsPerNode.keySet()) {
- Integer value = partitionsPerNode.get(node);
- if (value > _maxPerNode) {
- logger.error("ERROR: Node " + node + " has " + value
- + " partitions despite a maximum of " + _maxPerNode);
- return false;
- }
- }
- return true;
- }
-
- private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
- for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
- boolean isLive = _liveSet.contains(nodeState.getKey());
- boolean isEmpty = nodeState.getValue() == 0;
- if (!isLive && !isEmpty) {
- logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
- + nodeState.getValue() + " replicas!");
- return false;
- }
- }
- return true;
- }
-
- private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
- for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
- final Map<String, String> nodeMap = partitionEntry.getValue();
- final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
- for (String state : nodeMap.values()) {
- if (!stateCounts.containsKey(state)) {
- stateCounts.put(state, 1);
- } else {
- stateCounts.put(state, stateCounts.get(state) + 1);
- }
- }
- for (String state : stateCounts.keySet()) {
- if (state.equals(HelixDefinedState.DROPPED.toString())) {
- continue;
- }
- int count = stateCounts.get(state);
- int maximumCount = _states.get(state);
- if (count > maximumCount) {
- logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
- + " has " + count + " replicas when " + maximumCount + " is allowed!");
- return false;
- }
- }
- }
- return true;
- }
-
- private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
- for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
- Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
- int numUniques = nodeSet.size();
- int total = partitionEntry.getValue().size();
- if (numUniques < total) {
- logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
- + " nodes, but only " + numUniques + " are unique!");
- return false;
- }
- }
- return true;
- }
-
- private double getAverage(final Collection<Integer> values) {
- double sum = 0.0;
- for (Integer value : values) {
- sum += value;
- }
- if (values.size() != 0) {
- return sum / values.size();
- } else {
- return -1.0;
- }
- }
-
- private int getSum(final Collection<Integer> values) {
- int sum = 0;
- for (Integer value : values) {
- sum += value;
- }
- return sum;
- }
-
- private double getStdev(final Collection<Integer> values, double mean) {
- double sum = 0.0;
- for (Integer value : values) {
- double deviation = mean - value;
- sum += Math.pow(deviation, 2.0);
- }
- if (values.size() != 0) {
- sum /= values.size();
- return Math.pow(sum, 0.5);
- } else {
- return -1.0;
- }
- }
-
- private Map<String, Integer> getPartitionBucketsForNode(
- final Map<String, Map<String, String>> assignment) {
- Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
- for (String node : _liveNodes) {
- partitionsPerNode.put(node, 0);
- }
- for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
- final Map<String, String> nodeMap = partitionEntry.getValue();
- for (String node : nodeMap.keySet()) {
- String state = nodeMap.get(node);
- if (state.equals(HelixDefinedState.DROPPED.toString())) {
- continue;
- }
- // add 1 for every occurrence of a node
- if (!partitionsPerNode.containsKey(node)) {
- partitionsPerNode.put(node, 1);
- } else {
- partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
- }
- }
- }
- return partitionsPerNode;
- }
-
- private Map<String, Map<String, Integer>> getStateBucketsForNode(
- final Map<String, Map<String, String>> assignment) {
- Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
- for (String n : _liveNodes) {
- result.put(n, new TreeMap<String, Integer>());
- }
- for (Map<String, String> nodeStateMap : assignment.values()) {
- for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
- if (!result.containsKey(nodeState.getKey())) {
- result.put(nodeState.getKey(), new TreeMap<String, Integer>());
- }
- Map<String, Integer> stateMap = result.get(nodeState.getKey());
- if (!stateMap.containsKey(nodeState.getValue())) {
- stateMap.put(nodeState.getValue(), 1);
- } else {
- stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
- }
- }
- }
- return result;
- }
-
- /**
- * Randomly choose between killing, adding, or resurrecting a single node
- * @return (Partition -> (Node -> State)) ZNRecord
- */
- public ZNRecord runOnceRandomly() {
- double choose = _random.nextDouble();
- ZNRecord result = null;
- if (choose < P_KILL) {
- result = removeSingleNode(null);
- } else if (choose < P_KILL + P_ADD) {
- result = addSingleNode(null);
- } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
- result = resurrectSingleNode(null);
- }
- return result;
- }
-
- /**
- * Run rebalancer trying to add a never-live node
- * @param node
- * Optional String to add
- * @return ZNRecord result returned by the rebalancer
- */
- public ZNRecord addSingleNode(String node) {
- logger.info("=================== add node =================");
- if (_nonLiveSet.size() == 0) {
- logger.warn("Cannot add node because there are no nodes left to add.");
- return null;
- }
-
- // Get a random never-live node
- if (node == null || !_nonLiveSet.contains(node)) {
- node = getRandomSetElement(_nonLiveSet);
- }
- logger.info("Adding " + node);
- _liveNodes.add(node);
- _liveSet.add(node);
- _nonLiveSet.remove(node);
-
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
- computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
- }
-
- /**
- * Run rebalancer trying to remove a live node
- * @param node
- * Optional String to remove
- * @return ZNRecord result returned by the rebalancer
- */
- public ZNRecord removeSingleNode(String node) {
- logger.info("=================== remove node =================");
- if (_liveSet.size() == 0) {
- logger.warn("Cannot remove node because there are no nodes left to remove.");
- return null;
- }
-
- // Get a random never-live node
- if (node == null || !_liveSet.contains(node)) {
- node = getRandomSetElement(_liveSet);
- }
- logger.info("Removing " + node);
- _removedSet.add(node);
- _liveNodes.remove(node);
- _liveSet.remove(node);
-
- // the rebalancer expects that the current mapping doesn't contain deleted
- // nodes
- for (Map<String, String> nodeMap : _currentMapping.values()) {
- if (nodeMap.containsKey(node)) {
- nodeMap.remove(node);
- }
- }
-
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
- .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
- }
-
- /**
- * Run rebalancer trying to add back a removed node
- * @param node
- * Optional String to resurrect
- * @return ZNRecord result returned by the rebalancer
- */
- public ZNRecord resurrectSingleNode(String node) {
- logger.info("=================== resurrect node =================");
- if (_removedSet.size() == 0) {
- logger.warn("Cannot remove node because there are no nodes left to resurrect.");
- return null;
- }
-
- // Get a random never-live node
- if (node == null || !_removedSet.contains(node)) {
- node = getRandomSetElement(_removedSet);
- }
- logger.info("Resurrecting " + node);
- _removedSet.remove(node);
- _liveNodes.add(node);
- _liveSet.add(node);
-
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
- .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
- }
-
- private <T> T getRandomSetElement(Set<T> source) {
- int element = _random.nextInt(source.size());
- int i = 0;
- for (T node : source) {
- if (i == element) {
- return node;
- }
- i++;
- }
- return null;
- }
- }
-
- /**
- * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
- * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
- * current mapping, then it should distribute states as evenly as possible.
- */
- @Test
- public void testOrphansNotPreferred() {
- final String RESOURCE_NAME = "resource";
- final String[] PARTITIONS = {
- "resource_0", "resource_1", "resource_2"
- };
- final StateModelDefinition STATE_MODEL =
- new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
- final int REPLICA_COUNT = 2;
- final String[] NODES = {
- "n0", "n1", "n2"
- };
-
- // initial state, one node, no mapping
- List<String> allNodes = Lists.newArrayList(NODES[0]);
- List<String> liveNodes = Lists.newArrayList(NODES[0]);
- Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
- for (String partition : PARTITIONS) {
- currentMapping.put(partition, new HashMap<String, String>());
- }
-
- // make sure that when the first node joins, a single replica is assigned fairly
- List<String> partitions = ImmutableList.copyOf(PARTITIONS);
- LinkedHashMap<String, Integer> stateCount =
- AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
- ZNRecord znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- Map<String, List<String>> preferenceLists = znRecord.getListFields();
- for (String partition : currentMapping.keySet()) {
- // make sure these are all MASTER
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
- }
-
- // now assign a replica to the first node in the current mapping, and add a second node
- allNodes.add(NODES[1]);
- liveNodes.add(NODES[1]);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
- for (String partition : PARTITIONS) {
- currentMapping.get(partition).put(NODES[0], "MASTER");
- }
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
- + partition);
- Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
- + partition);
- }
-
- // now set the current mapping to reflect this update and make sure that it distributes masters
- for (String partition : PARTITIONS) {
- currentMapping.get(partition).put(NODES[1], "SLAVE");
- }
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- Set<String> firstNodes = Sets.newHashSet();
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- firstNodes.add(preferenceList.get(0));
- }
- Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
-
- // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
- // new node is never the most preferred
- allNodes.add(NODES[2]);
- liveNodes.add(NODES[2]);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
- // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
- currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
- currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- boolean newNodeUsed = false;
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- if (preferenceList.contains(NODES[2])) {
- newNodeUsed = true;
- Assert.assertEquals(preferenceList.get(1), NODES[2],
- "newly added node not at preference list tail for " + partition);
- }
- }
- Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
-
- // now remap this to take the new node into account, should go back to balancing masters, slaves
- // evenly across all nodes
- for (String partition : PARTITIONS) {
- currentMapping.get(partition).clear();
- }
- currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
- currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
- currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
- currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
- currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
- currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- firstNodes.clear();
- Set<String> secondNodes = Sets.newHashSet();
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- firstNodes.add(preferenceList.get(0));
- secondNodes.add(preferenceList.get(1));
- }
- Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
- Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
-
- // remove a node now, but use the current mapping with everything balanced just prior
- liveNodes.remove(0);
- stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
-
- // remove all references of n0 from the mapping, keep everything else in a legal state
- for (String partition : PARTITIONS) {
- currentMapping.get(partition).clear();
- }
- currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
- currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
- currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
- currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- Map<String, String> stateMap = currentMapping.get(partition);
- for (String participant : stateMap.keySet()) {
- Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
- + partition);
- }
- for (String participant : preferenceList) {
- if (!stateMap.containsKey(participant)) {
- Assert.assertNotSame(preferenceList.get(0), participant,
- "newly moved replica should not be master for " + partition);
- }
- }
- }
-
- // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
- for (String partition : PARTITIONS) {
- currentMapping.get(partition).clear();
- }
- currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
- currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
- currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
- currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
- currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
- currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
- znRecord =
- new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
- .computePartitionAssignment(liveNodes, currentMapping, allNodes);
- preferenceLists = znRecord.getListFields();
- firstNodes.clear();
- for (String partition : currentMapping.keySet()) {
- List<String> preferenceList = preferenceLists.get(partition);
- Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
- Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
- firstNodes.add(preferenceList.get(0));
- }
- Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
new file mode 100644
index 0000000..5169edd
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java
@@ -0,0 +1,172 @@
+package org.apache.helix.controller.Strategy;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestTopology {
+ private static Logger logger = Logger.getLogger(TestTopology.class);
+
+ @Test
+ public void testCreateClusterTopology() {
+ ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+ String topology = "/Rack/Sub-Rack/Host/Instance";
+ clusterConfig.getRecord().getSimpleFields()
+ .put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), topology);
+ clusterConfig.getRecord().getSimpleFields()
+ .put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "Sub-Rack");
+
+ List<String> allNodes = new ArrayList<String>();
+ List<String> liveNodes = new ArrayList<String>();
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+ Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+ for (int i = 0; i < 100; i++) {
+ String instance = "localhost_" + i;
+ InstanceConfig config = new InstanceConfig(instance);
+ String rack_id = "rack_" + i/25;
+ String sub_rack_id = "subrack-" + i/5;
+
+ String domain =
+ String.format("Rack=%s, Sub-Rack=%s, Host=%s", rack_id, sub_rack_id, instance);
+ config.setDomain(domain);
+ config.setHostName(instance);
+ config.setPort("9000");
+ allNodes.add(instance);
+
+ int weight = 0;
+ if (i % 10 != 0) {
+ liveNodes.add(instance);
+ weight = 1000;
+ if (i % 3 == 0) {
+ // set random instance weight.
+ weight = (i+1) * 100;
+ config.setWeight(weight);
+ }
+ }
+
+ instanceConfigMap.put(instance, config);
+
+ if (!nodeToWeightMap.containsKey(rack_id)) {
+ nodeToWeightMap.put(rack_id, 0);
+ }
+ nodeToWeightMap.put(rack_id, nodeToWeightMap.get(rack_id) + weight);
+ if (!nodeToWeightMap.containsKey(sub_rack_id)) {
+ nodeToWeightMap.put(sub_rack_id, 0);
+ }
+ nodeToWeightMap.put(sub_rack_id, nodeToWeightMap.get(sub_rack_id) + weight);
+ }
+
+ Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+ Assert.assertTrue(topo.getEndNodeType().equals("Instance"));
+ Assert.assertTrue(topo.getFaultZoneType().equals("Sub-Rack"));
+
+ List<Node> faultZones = topo.getFaultZones();
+ Assert.assertEquals(faultZones.size(), 20);
+
+ Node root = topo.getRootNode();
+
+ Assert.assertEquals(root.getChildrenCount("Rack"), 4);
+ Assert.assertEquals(root.getChildrenCount("Sub-Rack"), 20);
+ Assert.assertEquals(root.getChildrenCount("Host"), 100);
+ Assert.assertEquals(root.getChildrenCount("Instance"), 100);
+
+
+ // validate weights.
+ for (Node rack : root.getChildren()) {
+ Assert.assertEquals(rack.getWeight(), (long)nodeToWeightMap.get(rack.getName()));
+ for (Node subRack : rack.getChildren()) {
+ Assert.assertEquals(subRack.getWeight(), (long)nodeToWeightMap.get(subRack.getName()));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterTopologyWithDefaultTopology() {
+ ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster");
+
+ List<String> allNodes = new ArrayList<String>();
+ List<String> liveNodes = new ArrayList<String>();
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
+
+ Map<String, Integer> nodeToWeightMap = new HashMap<String, Integer>();
+
+ for (int i = 0; i < 100; i++) {
+ String instance = "localhost_" + i;
+ InstanceConfig config = new InstanceConfig(instance);
+ String zoneId = "rack_" + i / 10;
+ config.setZoneId(zoneId);
+ config.setHostName(instance);
+ config.setPort("9000");
+ allNodes.add(instance);
+
+ int weight = 0;
+ if (i % 10 != 0) {
+ liveNodes.add(instance);
+ weight = 1000;
+ if (i % 3 == 0) {
+ // set random instance weight.
+ weight = (i + 1) * 100;
+ config.setWeight(weight);
+ }
+ }
+
+ instanceConfigMap.put(instance, config);
+
+ if (!nodeToWeightMap.containsKey(zoneId)) {
+ nodeToWeightMap.put(zoneId, 0);
+ }
+ nodeToWeightMap.put(zoneId, nodeToWeightMap.get(zoneId) + weight);
+ }
+
+ Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig);
+
+ Assert.assertTrue(topo.getEndNodeType().equals(Topology.Types.INSTANCE.name()));
+ Assert.assertTrue(topo.getFaultZoneType().equals(Topology.Types.ZONE.name()));
+
+ List<Node> faultZones = topo.getFaultZones();
+ Assert.assertEquals(faultZones.size(), 10);
+
+ Node root = topo.getRootNode();
+
+ Assert.assertEquals(root.getChildrenCount(Topology.Types.ZONE.name()), 10);
+ Assert.assertEquals(root.getChildrenCount(topo.getEndNodeType()), 100);
+
+ // validate weights.
+ for (Node rack : root.getChildren()) {
+ Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
new file mode 100644
index 0000000..5c34792
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCrushAutoRebalance.java
@@ -0,0 +1,221 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCrushAutoRebalance extends ZkIntegrationTestBase {
+ final int NUM_NODE = 6;
+ protected static final int START_PORT = 12918;
+ protected static final int _PARTITIONS = 20;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ protected ClusterControllerManager _controller;
+
+ protected ClusterSetup _setupTool = null;
+ List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+ Map<String, String> _nodeToZoneMap = new HashMap<String, String>();
+ Map<String, String> _nodeToTagMap = new HashMap<String, String>();
+ List<String> _nodes = new ArrayList<String>();
+ List<String> _allDBs = new ArrayList<String>();
+ int _replica = 3;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursive(namespace);
+ }
+ _setupTool = new ClusterSetup(_gZkClient);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ String zone = "zone-" + i % 3;
+ String tag = "tag-" + i % 2;
+ _setupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, storageNodeName, zone);
+ _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag);
+ _nodeToZoneMap.put(storageNodeName, zone);
+ _nodeToTagMap.put(storageNodeName, tag);
+ _nodes.add(storageNodeName);
+ }
+
+ // start dummy participants
+ for (String node : _nodes) {
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+ }
+
+ @DataProvider(name = "rebalanceStrategies")
+ public static String [][] rebalanceStrategies() {
+ return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}};
+ }
+
+ @Test(dataProvider = "rebalanceStrategies")
+ public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass)
+ throws Exception {
+ System.out.println("Test " + rebalanceStrategyName);
+ List<String> testDBs = new ArrayList<String>();
+ String[] testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+ BuiltInStateModelDefinitions.MasterSlave.name(),
+ BuiltInStateModelDefinitions.LeaderStandby.name()
+ };
+ int i = 0;
+ for (String stateModel : testModels) {
+ String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+ _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel,
+ RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ testDBs.add(db);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ for (String db : testDBs) {
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateZoneAndTagIsolation(is, ev);
+ }
+ }
+
+ @Test(dataProvider = "rebalanceStrategies")
+ public void testZoneIsolationWithInstanceTag(
+ String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception {
+ List<String> testDBs = new ArrayList<String>();
+ Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+ int i = 0;
+ for (String tag : tags) {
+ String db = "Test-DB-Tag-" + rebalanceStrategyName + "-" + i++;
+ _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS,
+ BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO + "",
+ rebalanceStrategyClass);
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ is.setInstanceGroupTag(tag);
+ _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, is);
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ testDBs.add(db);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
+ for (String db : testDBs) {
+ IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ ExternalView ev =
+ _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+ validateZoneAndTagIsolation(is, ev);
+ }
+ }
+
+ /**
+ * Validate instances for each partition is on different zone and with necessary tagged instances.
+ */
+ private void validateZoneAndTagIsolation(IdealState is, ExternalView ev) {
+ int replica = Integer.valueOf(is.getReplicas());
+ String tag = is.getInstanceGroupTag();
+
+ for (String partition : is.getPartitionSet()) {
+ Set<String> assignedZones = new HashSet<String>();
+
+ Set<String> instancesInIs = new HashSet<String>(is.getRecord().getListField(partition));
+ Map<String, String> assignmentMap = ev.getRecord().getMapField(partition);
+ Set<String> instancesInEV = assignmentMap.keySet();
+ Assert.assertEquals(instancesInEV, instancesInIs);
+ for (String instance : instancesInEV) {
+ assignedZones.add(_nodeToZoneMap.get(instance));
+ if (tag != null) {
+ InstanceConfig config =
+ _setupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instance);
+ Assert.assertTrue(config.containsTag(tag));
+ }
+ }
+ Assert.assertEquals(assignedZones.size(), replica);
+ }
+ }
+
+ @Test()
+ public void testAddZone() throws Exception {
+ //TODO
+ }
+
+ @Test()
+ public void testAddNodes() throws Exception {
+ //TODO
+ }
+
+ @Test()
+ public void testNodeFailure() throws Exception {
+ //TODO
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ /**
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ }
+ _setupTool.deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 917be17..51dd19d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -31,6 +31,7 @@ import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateMod
import org.apache.helix.mock.participant.MockMSModelFactory;
import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.log4j.Logger;
@@ -73,14 +74,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
public void run() {
try {
StateMachineEngine stateMach = getStateMachineEngine();
- stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+ stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.MasterSlave.name(),
+ _msModelFactory);
DummyLeaderStandbyStateModelFactory lsModelFactory =
new DummyLeaderStandbyStateModelFactory(10);
DummyOnlineOfflineStateModelFactory ofModelFactory =
new DummyOnlineOfflineStateModelFactory(10);
- stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
- stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+ stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.LeaderStandby.name(),
+ lsModelFactory);
+ stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
+ ofModelFactory);
MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
[3/5] helix git commit: [HELIX-568] Add new topology aware
(rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is
available at:
https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Posted by lx...@apache.org.
[HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on CRUSH algorithm.
Design doc is available at: https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7147ec87
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7147ec87
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7147ec87
Branch: refs/heads/helix-0.6.x
Commit: 7147ec874e912f27905c299fefe0d09ca31ebd42
Parents: ea0fbbb
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jun 16 12:06:34 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/HelixAdmin.java | 30 +
.../java/org/apache/helix/HelixConstants.java | 4 +
.../main/java/org/apache/helix/PropertyKey.java | 3 +-
.../controller/rebalancer/AutoRebalancer.java | 15 +-
.../helix/controller/rebalancer/Rebalancer.java | 1 -
.../strategy/AutoRebalanceStrategy.java | 754 ++++++++++++++++++
.../strategy/CrushRebalanceStrategy.java | 174 +++++
.../rebalancer/strategy/RebalanceStrategy.java | 57 ++
.../crushMapping/CRUSHPlacementAlgorithm.java | 316 ++++++++
.../strategy/crushMapping/JenkinsHash.java | 140 ++++
.../controller/rebalancer/topology/Node.java | 208 +++++
.../rebalancer/topology/Topology.java | 295 +++++++
.../controller/stages/ClusterDataCache.java | 14 +-
.../strategy/AutoRebalanceStrategy.java | 753 ------------------
.../controller/strategy/RebalanceStrategy.java | 52 --
.../apache/helix/manager/zk/ZKHelixAdmin.java | 43 +-
.../org/apache/helix/model/ClusterConfig.java | 92 +++
.../java/org/apache/helix/model/IdealState.java | 2 +-
.../org/apache/helix/model/InstanceConfig.java | 50 +-
.../task/GenericTaskAssignmentCalculator.java | 7 +-
.../org/apache/helix/tools/ClusterSetup.java | 6 +
.../Strategy/TestAutoRebalanceStrategy.java | 766 +++++++++++++++++++
.../strategy/TestAutoRebalanceStrategy.java | 765 ------------------
.../helix/controller/strategy/TestTopology.java | 172 +++++
.../integration/TestCrushAutoRebalance.java | 221 ++++++
.../manager/MockParticipantManager.java | 10 +-
26 files changed, 3355 insertions(+), 1595 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index fbfab26..aeacd4b 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -114,6 +114,18 @@ public interface HelixAdmin {
String stateModelRef, String rebalancerMode);
/**
+ * Add a resource to a cluster
+ * @param clusterName
+ * @param resourceName
+ * @param numPartitions
+ * @param stateModelRef
+ * @param rebalancerMode
+ * @param rebalanceStrategy
+ */
+ void addResource(String clusterName, String resourceName, int numPartitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy);
+
+ /**
* Add a resource to a cluster, using a bucket size > 1
* @param clusterName
* @param resourceName
@@ -138,6 +150,22 @@ public interface HelixAdmin {
void addResource(String clusterName, String resourceName, int numPartitions,
String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance);
+
+ /**
+ * Add a resource to a cluster, using a bucket size > 1
+ * @param clusterName
+ * @param resourceName
+ * @param numPartitions
+ * @param stateModelRef
+ * @param rebalancerMode
+ * @param rebalanceStrategy
+ * @param bucketSize
+ * @param maxPartitionsPerInstance
+ */
+ void addResource(String clusterName, String resourceName, int numPartitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+ int maxPartitionsPerInstance);
+
/**
* Add an instance to a cluster
* @param clusterName
@@ -411,6 +439,8 @@ public interface HelixAdmin {
*/
void removeInstanceTag(String clusterName, String instanceName, String tag);
+ void setInstanceZoneId(String clusterName, String instanceName, String zoneId);
+
/**
* Release resources
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 5318fa9..6de0ff1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -43,6 +43,10 @@ public interface HelixConstants {
ANY_LIVEINSTANCE
}
+ /**
+ * Replaced by ClusterConfig.ClusterConfigProperty.
+ */
+ @Deprecated
enum ClusterConfigType {
HELIX_DISABLE_PIPELINE_TRIGGERS,
DISABLE_FULL_AUTO // override all resources in the cluster to use SEMI-AUTO instead of FULL-AUTO
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 33355f1..0125902 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
import java.util.Arrays;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Error;
@@ -186,7 +187,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey clusterConfig() {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfig.class,
_clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 6682426..ba237b1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -35,8 +35,8 @@ import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
@@ -79,8 +79,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
String replicas = currentIdealState.getReplicas();
- LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, Integer>();
- stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
+ LinkedHashMap<String, Integer> stateCountMap =
+ stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
allNodes.removeAll(clusterData.getDisabledInstances());
@@ -129,7 +129,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
- if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+ if (rebalanceStrategyName == null || rebalanceStrategyName
+ .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
_rebalanceStrategy =
new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
} else {
@@ -152,8 +153,8 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
}
}
- ZNRecord newMapping =
- _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ ZNRecord newMapping = _rebalanceStrategy
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: " + currentMapping);
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index f5a4ae8..6935378 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -46,5 +46,4 @@ public interface Rebalancer {
*/
IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
final CurrentStateOutput currentStateOutput, final ClusterDataCache clusterData);
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
new file mode 100644
index 0000000..868d207
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -0,0 +1,754 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.log4j.Logger;
+
+public class AutoRebalanceStrategy implements RebalanceStrategy {
+ private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
+ private final ReplicaPlacementScheme _placementScheme;
+
+ private String _resourceName;
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private int _maximumPerNode;
+
+ private Map<String, Node> _nodeMap;
+ private List<Node> _liveNodesList;
+ private Map<Integer, String> _stateMap;
+
+ private Map<Replica, Node> _preferredAssignment;
+ private Map<Replica, Node> _existingPreferredAssignment;
+ private Map<Replica, Node> _existingNonPreferredAssignment;
+ private Set<Replica> _orphaned;
+
+ public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ init(resourceName, partitions, states, maximumPerNode);
+ _placementScheme = new DefaultPlacementScheme();
+ }
+
+ public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states) {
+ this(resourceName, partitions, states, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _states = states;
+ _maximumPerNode = maximumPerNode;
+ }
+
+ @Override
+ public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
+ final Map<String, Map<String, String>> currentMapping, ClusterDataCache clusterData) {
+ int numReplicas = countStateReplicas();
+ ZNRecord znRecord = new ZNRecord(_resourceName);
+ if (liveNodes.size() == 0) {
+ return znRecord;
+ }
+ int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
+ int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+ _nodeMap = new HashMap<String, Node>();
+ _liveNodesList = new ArrayList<Node>();
+
+ for (String id : allNodes) {
+ Node node = new Node(id);
+ node.capacity = 0;
+ node.hasCeilingCapacity = false;
+ _nodeMap.put(id, node);
+ }
+ for (int i = 0; i < liveNodes.size(); i++) {
+ boolean usingCeiling = false;
+ int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
+ if (distRemainder > 0 && targetSize < _maximumPerNode) {
+ targetSize += 1;
+ distRemainder = distRemainder - 1;
+ usingCeiling = true;
+ }
+ Node node = _nodeMap.get(liveNodes.get(i));
+ node.isAlive = true;
+ node.capacity = targetSize;
+ node.hasCeilingCapacity = usingCeiling;
+ _liveNodesList.add(node);
+ }
+
+ // compute states for all replica ids
+ _stateMap = generateStateMap();
+
+ // compute the preferred mapping if all nodes were up
+ _preferredAssignment = computePreferredPlacement(allNodes);
+
+ // logger.info("preferred mapping:"+ preferredAssignment);
+ // from current mapping derive the ones in preferred location
+ // this will update the nodes with their current fill status
+ _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
+
+ // from current mapping derive the ones not in preferred location
+ _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
+
+ // compute orphaned replicas that are not assigned to any node
+ _orphaned = computeOrphaned();
+ if (logger.isInfoEnabled()) {
+ logger.info("orphan = " + _orphaned);
+ }
+
+ moveNonPreferredReplicasToPreferred();
+
+ assignOrphans();
+
+ moveExcessReplicas();
+
+ prepareResult(znRecord);
+ return znRecord;
+ }
+
+ /**
+ * Move replicas assigned to non-preferred nodes if their current node is at capacity
+ * and its preferred node is under capacity.
+ */
+ private void moveNonPreferredReplicasToPreferred() {
+ // iterate through non preferred and see if we can move them to the
+ // preferred location if the donor has more than it should and stealer has
+ // enough capacity
+ Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<Replica, Node> entry = iterator.next();
+ Replica replica = entry.getKey();
+ Node donor = entry.getValue();
+ Node receiver = _preferredAssignment.get(replica);
+ if (donor.capacity < donor.currentlyAssigned
+ && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
+ donor.currentlyAssigned = donor.currentlyAssigned - 1;
+ receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+ donor.nonPreferred.remove(replica);
+ receiver.preferred.add(replica);
+ donor.newReplicas.remove(replica);
+ receiver.newReplicas.add(replica);
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
+ */
+ private void assignOrphans() {
+ // now iterate over nodes and remaining orphaned partitions and assign
+ // partitions randomly
+ // Better to iterate over orphaned partitions first
+ Iterator<Replica> it = _orphaned.iterator();
+ while (it.hasNext()) {
+ Replica replica = it.next();
+ boolean added = false;
+ int startIndex = computeRandomStartIndex(replica);
+ for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+ Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+ if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
+ receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+ receiver.nonPreferred.add(replica);
+ receiver.newReplicas.add(replica);
+ added = true;
+ break;
+ }
+ }
+ if (!added) {
+ // try adding the replica by making room for it
+ added = assignOrphanByMakingRoom(replica);
+ }
+ if (added) {
+ it.remove();
+ }
+ }
+ if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+ logger.info("could not assign nodes to partitions: " + _orphaned);
+ }
+ }
+
+ /**
+ * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
+ * @param replica The replica to assign
+ * @return true if the assignment succeeded, false otherwise
+ */
+ private boolean assignOrphanByMakingRoom(Replica replica) {
+ Node capacityDonor = null;
+ Node capacityAcceptor = null;
+ int startIndex = computeRandomStartIndex(replica);
+ for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+ Node current = _liveNodesList.get(index % _liveNodesList.size());
+ if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
+ && !current.canAddIfCapacity(replica) && capacityDonor == null) {
+ // this node has space but cannot accept the node
+ capacityDonor = current;
+ } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
+ && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
+ // this node would be able to accept the replica if it has ceiling capacity
+ capacityAcceptor = current;
+ }
+ if (capacityDonor != null && capacityAcceptor != null) {
+ break;
+ }
+ }
+ if (capacityDonor != null && capacityAcceptor != null) {
+ // transfer ceiling capacity and add the node
+ capacityAcceptor.steal(capacityDonor, replica);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Move replicas from too-full nodes to nodes that can accept the replicas
+ */
+ private void moveExcessReplicas() {
+ // iterate over nodes and move extra load
+ Iterator<Replica> it;
+ for (Node donor : _liveNodesList) {
+ if (donor.capacity < donor.currentlyAssigned) {
+ Collections.sort(donor.nonPreferred);
+ it = donor.nonPreferred.iterator();
+ while (it.hasNext()) {
+ Replica replica = it.next();
+ int startIndex = computeRandomStartIndex(replica);
+ for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
+ Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+ if (receiver.canAdd(replica)) {
+ receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+ receiver.nonPreferred.add(replica);
+ donor.currentlyAssigned = donor.currentlyAssigned - 1;
+ it.remove();
+ break;
+ }
+ }
+ if (donor.capacity >= donor.currentlyAssigned) {
+ break;
+ }
+ }
+ if (donor.capacity < donor.currentlyAssigned) {
+ logger.warn("Could not take partitions out of node:" + donor.id);
+ }
+ }
+ }
+ }
+
+ /**
+ * Update a ZNRecord with the results of the rebalancing.
+ * @param znRecord
+ */
+ private void prepareResult(ZNRecord znRecord) {
+ // The map fields are keyed on partition name to a pair of node and state, i.e. it
+ // indicates that the partition with given state is served by that node
+ //
+ // The list fields are also keyed on partition and list all the nodes serving that partition.
+ // This is useful to verify that there is no node serving multiple replicas of the same
+ // partition.
+ Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
+ for (String partition : _partitions) {
+ znRecord.setMapField(partition, new TreeMap<String, String>());
+ znRecord.setListField(partition, new ArrayList<String>());
+ newPreferences.put(partition, new ArrayList<String>());
+ }
+
+ // for preference lists, the rough priority that we want is:
+ // [existing preferred, existing non-preferred, non-existing preferred, non-existing
+ // non-preferred]
+ for (Node node : _liveNodesList) {
+ for (Replica replica : node.preferred) {
+ if (node.newReplicas.contains(replica)) {
+ newPreferences.get(replica.partition).add(node.id);
+ } else {
+ znRecord.getListField(replica.partition).add(node.id);
+ }
+ }
+ }
+ for (Node node : _liveNodesList) {
+ for (Replica replica : node.nonPreferred) {
+ if (node.newReplicas.contains(replica)) {
+ newPreferences.get(replica.partition).add(node.id);
+ } else {
+ znRecord.getListField(replica.partition).add(node.id);
+ }
+ }
+ }
+ normalizePreferenceLists(znRecord.getListFields(), newPreferences);
+
+ // generate preference maps based on the preference lists
+ for (String partition : _partitions) {
+ List<String> preferenceList = znRecord.getListField(partition);
+ int i = 0;
+ for (String participant : preferenceList) {
+ znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+ i++;
+ }
+ }
+ }
+
+ /**
+ * Adjust preference lists to reduce the number of same replicas on an instance. This will
+ * separately normalize two sets of preference lists, and then append the results of the second
+ * set to those of the first. This basically ensures that existing replicas are automatically
+ * preferred.
+ * @param preferenceLists map of (partition --> list of nodes)
+ * @param newPreferences map containing node preferences not consistent with the current
+ * assignment
+ */
+ private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
+ Map<String, List<String>> newPreferences) {
+
+ Map<String, Map<String, Integer>> nodeReplicaCounts =
+ new HashMap<String, Map<String, Integer>>();
+ for (String partition : preferenceLists.keySet()) {
+ normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
+ }
+ for (String partition : newPreferences.keySet()) {
+ normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
+ preferenceLists.get(partition).addAll(newPreferences.get(partition));
+ }
+ }
+
+ /**
+ * Adjust a single preference list for replica assignment imbalance
+ * @param preferenceList list of node names
+ * @param nodeReplicaCounts map of (node --> state --> count)
+ */
+ private void normalizePreferenceList(List<String> preferenceList,
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
+ List<String> newPreferenceList = new ArrayList<String>();
+ int replicas = Math.min(countStateReplicas(), preferenceList.size());
+
+ // make this a LinkedHashSet to preserve iteration order
+ Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
+ for (int i = 0; i < replicas; i++) {
+ String state = _stateMap.get(i);
+ String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
+ newPreferenceList.add(node);
+ notAssigned.remove(node);
+ Map<String, Integer> counts = nodeReplicaCounts.get(node);
+ counts.put(state, counts.get(state) + 1);
+ }
+ preferenceList.clear();
+ preferenceList.addAll(newPreferenceList);
+ }
+
+ /**
+ * Get the node which hosts the fewest of a given replica
+ * @param state the state
+ * @param nodes nodes to check
+ * @param nodeReplicaCounts current assignment of replicas
+ * @return the node most willing to accept the replica
+ */
+ private String getMinimumNodeForReplica(String state, Set<String> nodes,
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
+ String minimalNode = null;
+ int minimalCount = Integer.MAX_VALUE;
+ for (String node : nodes) {
+ int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
+ if (count < minimalCount) {
+ minimalCount = count;
+ minimalNode = node;
+ }
+ }
+ return minimalNode;
+ }
+
+ /**
+ * Safe check for the number of replicas of a given id assiged to a node
+ * @param state the state to assign
+ * @param node the node to check
+ * @param nodeReplicaCounts a map of node to replica id and counts
+ * @return the number of currently assigned replicas of the given id
+ */
+ private int getReplicaCountForNode(String state, String node,
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
+ if (!nodeReplicaCounts.containsKey(node)) {
+ Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
+ replicaCounts.put(state, 0);
+ nodeReplicaCounts.put(node, replicaCounts);
+ return 0;
+ }
+ Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
+ if (!replicaCounts.containsKey(state)) {
+ replicaCounts.put(state, 0);
+ return 0;
+ }
+ return replicaCounts.get(state);
+ }
+
+ /**
+ * Compute the subset of the current mapping where replicas are not mapped according to their
+ * preferred assignment.
+ * @param currentMapping Current mapping of replicas to nodes
+ * @return The current assignments that do not conform to the preferred assignment
+ */
+ private Map<Replica, Node> computeExistingNonPreferredPlacement(
+ Map<String, Map<String, String>> currentMapping) {
+ Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
+ int count = countStateReplicas();
+ for (String partition : currentMapping.keySet()) {
+ Map<String, String> nodeStateMap = currentMapping.get(partition);
+ nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+ for (String nodeId : nodeStateMap.keySet()) {
+ Node node = _nodeMap.get(nodeId);
+ boolean skip = false;
+ for (Replica replica : node.preferred) {
+ if (replica.partition.equals(partition)) {
+ skip = true;
+ break;
+ }
+ }
+ if (skip) {
+ continue;
+ }
+ // check if its in one of the preferred position
+ for (int replicaId = 0; replicaId < count; replicaId++) {
+ Replica replica = new Replica(partition, replicaId);
+ if (!_preferredAssignment.containsKey(replica)) {
+
+ logger.info("partitions: " + _partitions);
+ logger.info("currentMapping.keySet: " + currentMapping.keySet());
+ throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
+ }
+
+ if (_preferredAssignment.get(replica).id != node.id
+ && !_existingPreferredAssignment.containsKey(replica)
+ && !existingNonPreferredAssignment.containsKey(replica)) {
+ existingNonPreferredAssignment.put(replica, node);
+ node.nonPreferred.add(replica);
+
+ break;
+ }
+ }
+ }
+ }
+ return existingNonPreferredAssignment;
+ }
+
+ /**
+ * Get a live node index to try first for a replica so that each possible start index is
+ * roughly uniformly assigned.
+ * @param replica The replica to assign
+ * @return The starting node index to try
+ */
+ private int computeRandomStartIndex(final Replica replica) {
+ return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+ }
+
+ /**
+ * Get a set of replicas not currently assigned to any node
+ * @return Unassigned replicas
+ */
+ private Set<Replica> computeOrphaned() {
+ Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
+ for (Replica r : _existingPreferredAssignment.keySet()) {
+ if (orphanedPartitions.contains(r)) {
+ orphanedPartitions.remove(r);
+ }
+ }
+ for (Replica r : _existingNonPreferredAssignment.keySet()) {
+ if (orphanedPartitions.contains(r)) {
+ orphanedPartitions.remove(r);
+ }
+ }
+
+ return orphanedPartitions;
+ }
+
+ /**
+ * Determine the replicas already assigned to their preferred nodes
+ * @param currentMapping Current assignment of replicas to nodes
+ * @return Assignments that conform to the preferred placement
+ */
+ private Map<Replica, Node> computeExistingPreferredPlacement(
+ final Map<String, Map<String, String>> currentMapping) {
+ Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
+ int count = countStateReplicas();
+ for (String partition : currentMapping.keySet()) {
+ Map<String, String> nodeStateMap = currentMapping.get(partition);
+ nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+ for (String nodeId : nodeStateMap.keySet()) {
+ Node node = _nodeMap.get(nodeId);
+ node.currentlyAssigned = node.currentlyAssigned + 1;
+ // check if its in one of the preferred position
+ for (int replicaId = 0; replicaId < count; replicaId++) {
+ Replica replica = new Replica(partition, replicaId);
+ if (_preferredAssignment.containsKey(replica)
+ && !existingPreferredAssignment.containsKey(replica)
+ && _preferredAssignment.get(replica).id == node.id) {
+ existingPreferredAssignment.put(replica, node);
+ node.preferred.add(replica);
+ break;
+ }
+ }
+ }
+ }
+
+ return existingPreferredAssignment;
+ }
+
+ /**
+ * Given a predefined set of all possible nodes, compute an assignment of replicas to
+ * nodes that evenly assigns all replicas to nodes.
+ * @param allNodes Identifiers to all nodes, live and non-live
+ * @return Preferred assignment of replicas
+ */
+ private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
+ Map<Replica, Node> preferredMapping;
+ preferredMapping = new HashMap<Replica, Node>();
+ int partitionId = 0;
+ int numReplicas = countStateReplicas();
+ int count = countStateReplicas();
+ for (String partition : _partitions) {
+ for (int replicaId = 0; replicaId < count; replicaId++) {
+ Replica replica = new Replica(partition, replicaId);
+ String nodeName =
+ _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
+ allNodes);
+ preferredMapping.put(replica, _nodeMap.get(nodeName));
+ }
+ partitionId = partitionId + 1;
+ }
+ return preferredMapping;
+ }
+
+ /**
+ * Counts the total number of replicas given a state-count mapping
+ * @return
+ */
+ private int countStateReplicas() {
+ int total = 0;
+ for (Integer count : _states.values()) {
+ total += count;
+ }
+ return total;
+ }
+
+ /**
+ * Compute a map of replica ids to state names
+ * @return Map: replica id -> state name
+ */
+ private Map<Integer, String> generateStateMap() {
+ int replicaId = 0;
+ Map<Integer, String> stateMap = new HashMap<Integer, String>();
+ for (String state : _states.keySet()) {
+ Integer count = _states.get(state);
+ for (int i = 0; i < count; i++) {
+ stateMap.put(replicaId, state);
+ replicaId++;
+ }
+ }
+ return stateMap;
+ }
+
+ /**
+ * A Node is an entity that can serve replicas. It has a capacity and knowledge
+ * of replicas assigned to it, so it can decide if it can receive additional replicas.
+ */
+ class Node {
+ public int currentlyAssigned;
+ public int capacity;
+ public boolean hasCeilingCapacity;
+ private final String id;
+ boolean isAlive;
+ private final List<Replica> preferred;
+ private final List<Replica> nonPreferred;
+ private final Set<Replica> newReplicas;
+
+ public Node(String id) {
+ preferred = new ArrayList<Replica>();
+ nonPreferred = new ArrayList<Replica>();
+ newReplicas = new TreeSet<Replica>();
+ currentlyAssigned = 0;
+ isAlive = false;
+ this.id = id;
+ }
+
+ /**
+ * Check if this replica can be legally added to this node
+ * @param replica The replica to test
+ * @return true if the assignment can be made, false otherwise
+ */
+ public boolean canAdd(Replica replica) {
+ if (currentlyAssigned >= capacity) {
+ return false;
+ }
+ return canAddIfCapacity(replica);
+ }
+
+ /**
+ * Check if this replica can be legally added to this node, provided that it has enough
+ * capacity.
+ * @param replica The replica to test
+ * @return true if the assignment can be made, false otherwise
+ */
+ public boolean canAddIfCapacity(Replica replica) {
+ if (!isAlive) {
+ return false;
+ }
+ for (Replica r : preferred) {
+ if (r.partition.equals(replica.partition)) {
+ return false;
+ }
+ }
+ for (Replica r : nonPreferred) {
+ if (r.partition.equals(replica.partition)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Receive a replica by stealing capacity from another Node
+ * @param donor The node that has excess capacity
+ * @param replica The replica to receive
+ */
+ public void steal(Node donor, Replica replica) {
+ donor.hasCeilingCapacity = false;
+ donor.capacity--;
+ hasCeilingCapacity = true;
+ capacity++;
+ currentlyAssigned++;
+ nonPreferred.add(replica);
+ newReplicas.add(replica);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
+ .append("\nnonpreferred:").append(nonPreferred.size());
+ return sb.toString();
+ }
+ }
+
+ /**
+ * A Replica is a combination of a partition of the resource, the state the replica is in
+ * and an identifier signifying a specific replica of a given partition and state.
+ */
+ class Replica implements Comparable<Replica> {
+ private String partition;
+ private int replicaId; // this is a partition-relative id
+ private String format;
+
+ public Replica(String partition, int replicaId) {
+ this.partition = partition;
+ this.replicaId = replicaId;
+ this.format = this.partition + "|" + this.replicaId;
+ }
+
+ @Override
+ public String toString() {
+ return format;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof Replica) {
+ return this.format.equals(((Replica) that).format);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.format.hashCode();
+ }
+
+ @Override
+ public int compareTo(Replica that) {
+ if (that instanceof Replica) {
+ return this.format.compareTo(that.format);
+ }
+ return -1;
+ }
+ }
+
+ /**
+ * Interface for providing a custom approach to computing a replica's affinity to a node.
+ */
+ public interface ReplicaPlacementScheme {
+ /**
+ * Initialize global state
+ * @param manager The instance to which this placement is associated
+ */
+ public void init(final HelixManager manager);
+
+ /**
+ * Given properties of this replica, determine the node it would prefer to be served by
+ * @param partitionId The current partition
+ * @param replicaId The current replica with respect to the current partition
+ * @param numPartitions The total number of partitions
+ * @param numReplicas The total number of replicas per partition
+ * @param nodeNames A list of identifiers of all nodes, live and non-live
+ * @return The name of the node that would prefer to serve this replica
+ */
+ public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+ final List<String> nodeNames);
+ }
+
+ /**
+ * Compute preferred placements based on a default strategy that assigns replicas to nodes as
+ * evenly as possible while avoiding placing two replicas of the same partition on any node.
+ */
+ public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
+ @Override
+ public void init(final HelixManager manager) {
+ // do nothing since this is independent of the manager
+ }
+
+ @Override
+ public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
+ final List<String> nodeNames) {
+ int index;
+ if (nodeNames.size() > numPartitions) {
+ // assign replicas in partition order in case there are more nodes than partitions
+ index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+ } else if (nodeNames.size() == numPartitions) {
+ // need a replica offset in case the sizes of these sets are the same
+ index =
+ ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
+ % nodeNames.size();
+ } else {
+ // in all other cases, assigning a replica at a time for each partition is reasonable
+ index = (partitionId + replicaId) % nodeNames.size();
+ }
+ return nodeNames.get(index);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
new file mode 100644
index 0000000..a8fe107
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -0,0 +1,174 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
+import org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * CRUSH-based partition mapping strategy.
+ */
+public class CrushRebalanceStrategy implements RebalanceStrategy {
+ private String _resourceName;
+ private List<String> _partitions;
+ private Topology _clusterTopo;
+ private int _replicas;
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _replicas = countStateReplicas(states);
+ }
+
+ /**
+ * Compute the preference lists and (optional partition-state mapping) for the given resource.
+ *
+ * @param allNodes All instances
+ * @param liveNodes List of live instances
+ * @param currentMapping current replica mapping
+ * @param clusterData cluster data
+ * @return
+ * @throws HelixException if a map can not be found
+ */
+ @Override
+ public ZNRecord computePartitionAssignment(final List<String> allNodes,
+ final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+ ClusterDataCache clusterData) throws HelixException {
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ _clusterTopo =
+ new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+ Node topNode = _clusterTopo.getRootNode();
+
+ Map<String, List<String>> newPreferences = new HashMap<String, List<String>>();
+ for (int i = 0; i < _partitions.size(); i++) {
+ String partitionName = _partitions.get(i);
+ long data = partitionName.hashCode();
+
+ // apply the placement rules
+ List<Node> selected = select(topNode, data, _replicas);
+
+ List<String> nodeList = new ArrayList<String>();
+ for (int j = 0; j < selected.size(); j++) {
+ nodeList.add(selected.get(j).getName());
+ }
+
+ newPreferences.put(partitionName, nodeList);
+ }
+
+ ZNRecord result = new ZNRecord(_resourceName);
+ result.setListFields(newPreferences);
+
+ return result;
+ }
+
+ /**
+ * Number of retries for finding an appropriate instance for a replica.
+ */
+ private static final int MAX_RETRY = 100;
+ private final JenkinsHash hashFun = new JenkinsHash();
+ private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm();
+
+ /**
+ * Enforce isolation on the specified fault zone.
+ * The caller will either get the expected number of selected nodes as a result, or an exception will be thrown.
+ */
+ private List<Node> select(Node topNode, long data, int rf)
+ throws HelixException {
+ List<Node> nodes = new ArrayList<Node>(rf);
+ Set<Node> selectedZones = new HashSet<Node>();
+ long input = data;
+ int count = rf;
+ int tries = 0;
+ while (nodes.size() < rf) {
+ doSelect(topNode, input, count, nodes, selectedZones);
+ count = rf - nodes.size();
+ if (count > 0) {
+ input = hashFun.hash(input); // create a different hash value for retrying
+ tries++;
+ if (tries >= MAX_RETRY) {
+ throw new HelixException(
+ String.format("could not find all mappings after %d tries", tries));
+ }
+ }
+ }
+ return nodes;
+ }
+
+ private void doSelect(Node topNode, long input, int rf, List<Node> selectedNodes,
+ Set<Node> selectedZones) {
+ String zoneType = _clusterTopo.getFaultZoneType();
+ String endNodeType = _clusterTopo.getEndNodeType();
+
+ if (!zoneType.equals(endNodeType)) {
+ // pick fault zones first
+ List<Node> zones = placementAlgorithm
+ .select(topNode, input, rf, zoneType, nodeAlreadySelected(selectedZones));
+ // add the racks to the selected racks
+ selectedZones.addAll(zones);
+ // pick one end node from each fault zone.
+ for (Node zone : zones) {
+ List<Node> endNode = placementAlgorithm.select(zone, input, 1, endNodeType);
+ selectedNodes.addAll(endNode);
+ }
+ } else {
+ // pick end node directly
+ List<Node> nodes = placementAlgorithm.select(topNode, input, rf, endNodeType,
+ nodeAlreadySelected(new HashSet(selectedNodes)));
+ selectedNodes.addAll(nodes);
+ }
+ }
+
+ /**
+ * Use the predicate to reject already selected zones or nodes.
+ */
+ private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) {
+ return Predicates.not(Predicates.in(selectedNodes));
+ }
+
+ /**
+ * Counts the total number of replicas given a state-count mapping
+ * @return
+ */
+ private int countStateReplicas(Map<String, Integer> stateCountMap) {
+ int total = 0;
+ for (Integer count : stateCountMap.values()) {
+ total += count;
+ }
+ return total;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..a3c7e94
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
@@ -0,0 +1,57 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+ String DEFAULT_REBALANCE_STRATEGY = "DEFAULT";
+
+ /**
+ * Perform the necessary initialization for the rebalance strategy object.
+ *
+ * @param resourceName
+ * @param partitions
+ * @param states
+ * @param maximumPerNode
+ */
+ void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+ /**
+ * Compute the preference lists and (optional partition-state mapping) for the given resource.
+ *
+ * @param liveNodes
+ * @param currentMapping
+ * @param allNodes
+ * @return
+ */
+ ZNRecord computePartitionAssignment(final List<String> allNodes,
+ final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
+ ClusterDataCache clusterData);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
new file mode 100644
index 0000000..870656c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -0,0 +1,316 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The transcription of the CRUSH placement algorithm from the Weil paper. This is a fairly simple
+ * adaptation, but a couple of important changes have been made to work with the crunch mapping.
+ */
+public class CRUSHPlacementAlgorithm {
+ /**
+ * In case the select() method fails to select after looping back to the origin of selection after
+ * so many tries, we stop the search. This constant denotes the maximum number of retries after
+ * looping back to the origin. It is expected that in most cases the selection will either succeed
+ * with a small number of tries, or it will never succeed. So a reasonably large number to
+ * distinguish these two cases should be sufficient.
+ */
+ private static final int MAX_LOOPBACK_COUNT = 50;
+ private static final Logger logger = LoggerFactory.getLogger(CRUSHPlacementAlgorithm.class);
+
+ private final boolean keepOffset;
+ private final Map<Long,Integer> roundOffset;
+
+ /**
+ * Creates the crush placement object.
+ */
+ public CRUSHPlacementAlgorithm() {
+ this(false);
+ }
+
+ /**
+ * Creates the crush placement algorithm with the indication whether the round offset should be
+ * kept for the duration of this object for successive selection of the same input.
+ */
+ public CRUSHPlacementAlgorithm(boolean keepOffset) {
+ this.keepOffset = keepOffset;
+ roundOffset = keepOffset ? new HashMap<Long,Integer>() : null;
+ }
+
+ /**
+ * Returns a list of (count) nodes of the desired type. If the count is more than the number of
+ * available nodes, an exception is thrown. Note that it is possible for this method to return a
+ * list whose size is smaller than the requested size (count) if it is unable to select all the
+ * nodes for any reason. Callers should check the size of the returned list and take action if
+ * needed.
+ */
+ public List<Node> select(Node parent, long input, int count, String type) {
+ return select(parent, input, count, type, Predicates.<Node>alwaysTrue());
+ }
+
+ public List<Node> select(Node parent, long input, int count, String type,
+ Predicate<Node> nodePredicate) {
+ int childCount = parent.getChildrenCount(type);
+ if (childCount < count) {
+ throw new IllegalArgumentException(count + " nodes of type " + type +
+ " were requested but the tree has only " + childCount + " nodes!");
+ }
+
+ List<Node> selected = new ArrayList<Node>(count);
+ // use the index stored in the map
+ Integer offset;
+ if (keepOffset) {
+ offset = roundOffset.get(input);
+ if (offset == null) {
+ offset = 0;
+ roundOffset.put(input, offset);
+ }
+ } else {
+ offset = 0;
+ }
+
+ int rPrime = 0;
+ for (int r = 1; r <= count; r++) {
+ int failure = 0;
+ // number of times we had to loop back to the origin
+ int loopbackCount = 0;
+ boolean escape = false;
+ boolean retryOrigin;
+ Node out = null;
+ do {
+ retryOrigin = false; // initialize at the outset
+ Node in = parent;
+ Set<Node> rejected = new HashSet<Node>();
+ boolean retryNode;
+ do {
+ retryNode = false; // initialize at the outset
+ rPrime = r + offset + failure;
+ logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime});
+ Selector selector = new Selector(in);
+ out = selector.select(input, rPrime);
+ if (!out.getType().equalsIgnoreCase(type)) {
+ logger.trace("selected output {} for data {} didn't match the type {}: walking down " +
+ "the hierarchy...", new Object[] {out, input, type});
+ in = out; // walk down the hierarchy
+ retryNode = true; // stay within the node and walk down the tree
+ } else { // type matches
+ boolean predicateRejected = !nodePredicate.apply(out);
+ if (selected.contains(out) || predicateRejected) {
+ if (predicateRejected) {
+ logger.trace("{} was rejected by the node predicate for data {}: rejecting and " +
+ "increasing rPrime", out, input);
+ rejected.add(out);
+ } else { // already selected
+ logger.trace("{} was already selected for data {}: rejecting and increasing rPrime",
+ out, input);
+ }
+
+ // we need to see if we have selected all possible nodes from this parent, in which
+ // case we should loop back to the origin and start over
+ if (allChildNodesEliminated(in, selected, rejected)) {
+ logger.trace("all child nodes of {} have been eliminated", in);
+ if (loopbackCount == MAX_LOOPBACK_COUNT) {
+ // we looped back the maximum times we specified; we give up search, and exit
+ escape = true;
+ break;
+ }
+ loopbackCount++;
+ logger.trace("looping back to the original parent node ({})", parent);
+ retryOrigin = true;
+ } else {
+ retryNode = true; // go back and reselect on the same parent
+ }
+ failure++;
+ } else if (nodeIsOut(out)) {
+ logger.trace("{} is marked as out (failed or over the maximum assignment) for data " +
+ "{}! looping back to the original parent node", out, input);
+ failure++;
+ if (loopbackCount == MAX_LOOPBACK_COUNT) {
+ // we looped back the maximum times we specified; we give up search, and exit
+ escape = true;
+ break;
+ }
+ loopbackCount++;
+ // re-selection on the same parent is detrimental in case of node failure: loop back
+ // to the origin
+ retryOrigin = true;
+ } else {
+ // we got a successful selection
+ break;
+ }
+ }
+ } while (retryNode);
+ } while (retryOrigin);
+
+ if (escape) {
+ // cannot find a node under this parent; return a smaller set than was intended
+ logger.debug("we could not select a node for data {} under parent {}; a smaller data set " +
+ "than is requested will be returned", input, parent);
+ continue;
+ }
+
+ logger.trace("{} was selected for data {}", out, input);
+ selected.add(out);
+ }
+ if (keepOffset) {
+ roundOffset.put(input, rPrime);
+ }
+ return selected;
+ }
+
+
+ private boolean nodeIsOut(Node node) {
+ if (node.isLeaf() && node.isFailed()) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Examines the immediate child nodes of the given parent node, and sees if all of the children
+ * that can be selected (i.e. not failed) are already selected. This is used to determine whether
+ * this parent node should no longer be used in the selection.
+ */
+ private boolean allChildNodesEliminated(Node parent, List<Node> selected, Set<Node> rejected) {
+ List<Node> children = parent.getChildren();
+ if (children != null) {
+ for (Node child: children) {
+ if (!nodeIsOut(child) && !selected.contains(child) && !rejected.contains(child)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Selection algorithm based on the "straw" bucket type as described in the CRUSH algorithm.
+ */
+ private class Selector {
+ private final Map<Node,Long> straws = new HashMap<Node,Long>();
+ private final JenkinsHash hashFunction;
+
+ public Selector(Node node) {
+ if (!node.isLeaf()) {
+ // create a map from the nodes to their values
+ List<Node> sortedNodes = sortNodes(node.getChildren()); // do a reverse sort by weight
+
+ int numLeft = sortedNodes.size();
+ float straw = 1.0f;
+ float wbelow = 0.0f;
+ float lastw = 0.0f;
+ int i = 0;
+ final int length = sortedNodes.size();
+ while (i < length) {
+ Node current = sortedNodes.get(i);
+ if (current.getWeight() == 0) {
+ straws.put(current, 0L);
+ i++;
+ continue;
+ }
+ straws.put(current, (long)(straw*0x10000));
+ i++;
+ if (i == length) {
+ break;
+ }
+
+ current = sortedNodes.get(i);
+ Node previous = sortedNodes.get(i-1);
+ if (current.getWeight() == previous.getWeight()) {
+ continue;
+ }
+ wbelow += (float)(previous.getWeight() - lastw)*numLeft;
+ for (int j = i; j < length; j++) {
+ if (sortedNodes.get(j).getWeight() == current.getWeight()) {
+ numLeft--;
+ } else {
+ break;
+ }
+ }
+ float wnext = (float)(numLeft * (current.getWeight() - previous.getWeight()));
+ float pbelow = wbelow/(wbelow + wnext);
+ straw *= Math.pow(1.0/pbelow, 1.0/numLeft);
+ lastw = previous.getWeight();
+ }
+ }
+ hashFunction = new JenkinsHash();
+ }
+
+ /**
+ * Returns a new list that's sorted in the reverse order of the weight.
+ */
+ private List<Node> sortNodes(List<Node> nodes) {
+ List<Node> ret = new ArrayList<Node>(nodes);
+ sortNodesInPlace(ret);
+ return ret;
+ }
+
+ /**
+ * Sorts the list in place in the reverse order of the weight.
+ */
+ private void sortNodesInPlace(List<Node> nodes) {
+ Collections.sort(nodes, new Comparator<Node>() {
+ public int compare(Node n1, Node n2) {
+ if (n2.getWeight() == n1.getWeight()) {
+ return 0;
+ }
+ return (n2.getWeight() - n1.getWeight() > 0) ? 1 : -1;
+ // sort by weight only in the reverse order
+ }
+ });
+ }
+
+ public Node select(long input, long round) {
+ Node selected = null;
+ long hiScore = -1;
+ for (Map.Entry<Node,Long> e: straws.entrySet()) {
+ Node child = e.getKey();
+ long straw = e.getValue();
+ long score = weightedScore(child, straw, input, round);
+ if (score > hiScore) {
+ selected = child;
+ hiScore = score;
+ }
+ }
+ if (selected == null) {
+ throw new IllegalStateException();
+ }
+ return selected;
+ }
+
+ private long weightedScore(Node child, long straw, long input, long round) {
+ long hash = hashFunction.hash(input, child.getId(), round);
+ hash = hash&0xffff;
+ long weightedScore = hash*straw;
+ return weightedScore;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
new file mode 100644
index 0000000..66566f8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+public class JenkinsHash {
+ // max value to limit it to 4 bytes
+ private static final long MAX_VALUE = 0xFFFFFFFFL;
+ private static final long CRUSH_HASH_SEED = 1315423911L;
+
+ /**
+ * Convert a byte into a long value without making it negative.
+ */
+ private static long byteToLong(byte b) {
+ long val = b & 0x7F;
+ if ((b & 0x80) != 0) {
+ val += 128;
+ }
+ return val;
+ }
+
+ /**
+ * Do addition and turn into 4 bytes.
+ */
+ private static long add(long val, long add) {
+ return (val + add) & MAX_VALUE;
+ }
+
+ /**
+ * Do subtraction and turn into 4 bytes.
+ */
+ private static long subtract(long val, long subtract) {
+ return (val - subtract) & MAX_VALUE;
+ }
+
+ /**
+ * Left shift val by shift bits and turn in 4 bytes.
+ */
+ private static long xor(long val, long xor) {
+ return (val ^ xor) & MAX_VALUE;
+ }
+
+ /**
+ * Left shift val by shift bits. Cut down to 4 bytes.
+ */
+ private static long leftShift(long val, int shift) {
+ return (val << shift) & MAX_VALUE;
+ }
+
+ /**
+ * Convert 4 bytes from the buffer at offset into a long value.
+ */
+ private static long fourByteToLong(byte[] bytes, int offset) {
+ return (byteToLong(bytes[offset + 0])
+ + (byteToLong(bytes[offset + 1]) << 8)
+ + (byteToLong(bytes[offset + 2]) << 16)
+ + (byteToLong(bytes[offset + 3]) << 24));
+ }
+
+ /**
+ * Mix up the values in the hash function.
+ */
+ private static Triple hashMix(Triple t) {
+ long a = t.a; long b = t.b; long c = t.c;
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+ a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+ b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+ c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+ return new Triple(a, b, c);
+ }
+
+ private static class Triple {
+ long a;
+ long b;
+ long c;
+
+ public Triple(long a, long b, long c) {
+ this.a = a; this.b = b; this.c = c;
+ }
+ }
+
+ public long hash(long a) {
+ long hash = xor(CRUSH_HASH_SEED, a);
+ long b = a;
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(b, x, hash));
+ b = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, a, hash));
+ hash = val.c;
+ return hash;
+ }
+
+ public long hash(long a, long b) {
+ long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(a, b, hash));
+ a = val.a; b = val.b; hash = val.c;
+ val = hashMix(new Triple(x, a, hash));
+ x = val.a; a = val.b; hash = val.c;
+ val = hashMix(new Triple(b, y, hash));
+ hash = val.c;
+ return hash;
+ }
+
+ public long hash(long a, long b, long c) {
+ long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+ long x = 231232L;
+ long y = 1232L;
+ Triple val = hashMix(new Triple(a, b, hash));
+ a = val.a; b = val.b; hash = val.c;
+ val = hashMix(new Triple(c, x, hash));
+ c = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, a, hash));
+ y = val.a; a = val.b; hash = val.c;
+ val = hashMix(new Triple(b, x, hash));
+ b = val.a; x = val.b; hash = val.c;
+ val = hashMix(new Triple(y, c, hash));
+ hash = val.c;
+ return hash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
new file mode 100644
index 0000000..3a52a21
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class Node implements Comparable<Node> {
+ private String _name;
+ private String _type;
+ private long _id;
+ private long _weight;
+
+ private LinkedHashMap<String, Node> _children = new LinkedHashMap<String, Node>();
+ private Node _parent;
+
+ private boolean _failed;
+
+ public Node() {
+
+ }
+
+ public Node(Node node) {
+ _name = node.getName();
+ _type = node.getType();
+ _id = node.getId();
+ _weight = node.getWeight();
+ _failed = node.isFailed();
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public void setName(String name) {
+ _name = name;
+ }
+
+ public String getType() {
+ return _type;
+ }
+
+ public void setType(String type) {
+ _type = type;
+ }
+
+ public long getId() {
+ return _id;
+ }
+
+ public void setId(long id) {
+ _id = id;
+ }
+
+ public long getWeight() {
+ return _weight;
+ }
+
+ public void setWeight(long weight) {
+ _weight = weight;
+ }
+
+ public void addWeight(long weight) { _weight += weight; }
+
+ public boolean isFailed() {
+ return _failed;
+ }
+
+ public void setFailed(boolean failed) {
+ if (!isLeaf()) {
+ throw new UnsupportedOperationException("you cannot set failed on a non-leaf!");
+ }
+ _failed = failed;
+ }
+
+ public List<Node> getChildren() {
+ return new ArrayList<Node>(_children.values());
+ }
+
+ /**
+ * Add a child, if there exists a child with the same name, will replace it.
+ *
+ * @param child
+ */
+ public void addChild(Node child) {
+ _children.put(child.getName(), child);
+ }
+
+ /**
+ * Has child with given name.
+ * @param name
+ * @return
+ */
+ public boolean hasChild(String name) {
+ return _children.containsKey(name);
+ }
+
+ /**
+ * Get child node with given name.
+ *
+ * @param name
+ * @return
+ */
+ public Node getChild(String name) {
+ return _children.get(name);
+ }
+
+ public boolean isLeaf() {
+ return _children == null || _children.isEmpty();
+ }
+
+ public Node getParent() {
+ return _parent;
+ }
+
+ public void setParent(Node parent) {
+ _parent = parent;
+ }
+
+ /**
+ * Returns all child nodes that match the type. Returns itself if this node matches it. If no
+ * child matches the type, an empty list is returned.
+ */
+ protected List<Node> findChildren(String type) {
+ List<Node> nodes = new ArrayList<Node>();
+ if (_type.equalsIgnoreCase(type)) {
+ nodes.add(this);
+ } else if (!isLeaf()) {
+ for (Node child: _children.values()) {
+ nodes.addAll(child.findChildren(type));
+ }
+ }
+ return nodes;
+ }
+
+ /**
+ * Returns the number of all child nodes that match the type. Returns 1 if this node matches it.
+ * Returns 0 if no child matches the type.
+ */
+ public int getChildrenCount(String type) {
+ int count = 0;
+ if (_type.equalsIgnoreCase(type)) {
+ count++;
+ } else if (!isLeaf()) {
+ for (Node child: _children.values()) {
+ count += child.getChildrenCount(type);
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Returns the top-most ("root") node from this node. If this node itself does not have a parent,
+ * returns itself.
+ */
+ public Node getRoot() {
+ Node node = this;
+ while (node.getParent() != null) {
+ node = node.getParent();
+ }
+ return node;
+ }
+
+ @Override
+ public String toString() {
+ return _name + ":" + _id;
+ }
+
+ @Override
+ public int hashCode() {
+ return _name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof Node)) {
+ return false;
+ }
+ Node that = (Node)obj;
+ return _name.equals(that.getName());
+ }
+
+ @Override
+ public int compareTo(Node o) {
+ return _name.compareTo(o.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
new file mode 100644
index 0000000..1057fad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -0,0 +1,295 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Topology represents the structure of a cluster (the hierarchy of the nodes, its fault boundary, etc).
+ * This class is intended for topology-aware partition placement.
+ */
+public class Topology {
+ private static Logger logger = Logger.getLogger(Topology.class);
+ public enum Types {
+ ROOT,
+ ZONE,
+ INSTANCE
+ }
+ private static final int DEFAULT_NODE_WEIGHT = 1000;
+
+ private final MessageDigest _md;
+ private Node _root; // root of the tree structure of all nodes;
+ private List<String> _allInstances;
+ private List<String> _liveInstances;
+ private Map<String, InstanceConfig> _instanceConfigMap;
+ private HelixProperty _clusterConfig;
+ private String _faultZoneType;
+ private String _endNodeType;
+ private boolean _useDefaultTopologyDef;
+ private LinkedHashSet<String> _types;
+
+ /* default names for domain paths, if value is not specified for a domain path, the default one is used */
+ // TODO: default values can be defined in clusterConfig.
+ private Map<String, String> _defaultDomainPathValues = new HashMap<String, String>();
+
+ public Topology(final List<String> allNodes, final List<String> liveNodes,
+ final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+ try {
+ _md = MessageDigest.getInstance("SHA-1");
+ _allInstances = allNodes;
+ _liveInstances = liveNodes;
+ _instanceConfigMap = instanceConfigMap;
+ _clusterConfig = clusterConfig;
+ _types = new LinkedHashSet<String>();
+
+ String topologyDef = _clusterConfig.getRecord()
+ .getSimpleField(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name());
+ if (topologyDef != null) {
+ // Customized cluster topology definition is configured.
+ String[] types = topologyDef.trim().split("/");
+ for (int i = 0; i < types.length; i++) {
+ if (types[i].length() != 0) {
+ _types.add(types[i]);
+ }
+ }
+ if (_types.size() == 0) {
+ logger.error("Invalid cluster topology definition " + topologyDef);
+ throw new HelixException("Invalid cluster topology definition " + topologyDef);
+ } else {
+ String lastType = null;
+ for (String type : _types) {
+ _defaultDomainPathValues.put(type, "Helix_default_" + type);
+ lastType = type;
+ }
+ _endNodeType = lastType;
+ _faultZoneType = _clusterConfig.getRecord()
+ .getStringField(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(),
+ _endNodeType);
+ if (!_types.contains(_faultZoneType)) {
+ throw new HelixException(String
+ .format("Invalid fault zone type %s, not present in topology definition %s.",
+ _faultZoneType, topologyDef));
+ }
+ _useDefaultTopologyDef = false;
+ }
+ } else {
+ // Use default cluster topology definition, i,e. /root/zone/instance
+ _types.add(Types.ZONE.name());
+ _types.add(Types.INSTANCE.name());
+ _endNodeType = Types.INSTANCE.name();
+ _faultZoneType = Types.ZONE.name();
+ _useDefaultTopologyDef = true;
+ }
+ } catch (NoSuchAlgorithmException ex) {
+ throw new IllegalArgumentException(ex);
+ }
+ if (_useDefaultTopologyDef) {
+ _root = createClusterTreeWithDefaultTopologyDef();
+ } else {
+ _root = createClusterTreeWithCustomizedTopology();
+ }
+ }
+
+ public String getEndNodeType() {
+ return _endNodeType;
+ }
+
+ public String getFaultZoneType() {
+ return _faultZoneType;
+ }
+
+ public Node getRootNode() {
+ return _root;
+ }
+
+ public List<Node> getFaultZones() {
+ if (_root != null) {
+ return _root.findChildren(getFaultZoneType());
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Creates a tree representing the cluster structure using default cluster topology definition
+ * (i,e no topology definition given and no domain id set).
+ */
+ private Node createClusterTreeWithDefaultTopologyDef() {
+ // root
+ Node root = new Node();
+ root.setName("root");
+ root.setId(computeId("root"));
+ root.setType(Types.ROOT.name());
+
+ for (String ins : _allInstances) {
+ InstanceConfig config = _instanceConfigMap.get(ins);
+ if (config == null) {
+ throw new HelixException(String.format("Config for instance %s is not found!", ins));
+ }
+ String zone = config.getZoneId();
+ if (zone == null) {
+ //TODO: we should allow non-rack cluster for back-compatible. This should be solved once
+ // we have the hierarchy style of domain id for instance.
+ throw new HelixException(String
+ .format("ZONE_ID for instance %s is not set, failed the topology-aware placement!",
+ ins));
+ }
+ Map<String, String> pathValueMap = new HashMap<String, String>();
+ pathValueMap.put(Types.ZONE.name(), zone);
+ pathValueMap.put(Types.INSTANCE.name(), ins);
+
+ int weight = config.getWeight();
+ if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+ weight = DEFAULT_NODE_WEIGHT;
+ }
+ addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+ }
+
+ return root;
+ }
+
+ /**
+ * Creates a tree representing the cluster structure using default cluster topology definition
+ * (i,e no topology definition given and no domain id set).
+ */
+ private Node createClusterTreeWithCustomizedTopology() {
+ // root
+ Node root = new Node();
+ root.setName("root");
+ root.setId(computeId("root"));
+ root.setType(Types.ROOT.name());
+
+ for (String ins : _allInstances) {
+ InstanceConfig insConfig = _instanceConfigMap.get(ins);
+ if (insConfig == null) {
+ throw new HelixException(String.format("Config for instance %s is not found!", ins));
+ }
+ String domain = insConfig.getDomain();
+ if (domain == null) {
+ throw new HelixException(String
+ .format("Domain for instance %s is not set, failed the topology-aware placement!",
+ ins));
+ }
+
+ String[] pathPairs = domain.trim().split(",");
+ Map<String, String> pathValueMap = new HashMap<String, String>();
+ for (String pair : pathPairs) {
+ String[] values = pair.trim().split("=");
+ if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) {
+ throw new HelixException(String.format(
+ "Domain-Value pair %s for instance %s is not valid, failed the topology-aware placement!",
+ pair, ins));
+ }
+ String type = values[0];
+ String value = values[1];
+
+ if (!_types.contains(type)) {
+ logger.warn(String
+ .format("Path %s defined in domain of instance %s not recognized, ignored!", pair,
+ ins));
+ continue;
+ }
+ pathValueMap.put(type, value);
+ }
+
+ int weight = insConfig.getWeight();
+ if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+ weight = DEFAULT_NODE_WEIGHT;
+ }
+
+ root = addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+ }
+
+ return root;
+ }
+
+
+ /**
+ * Add an end node to the tree, create all the paths to the leaf node if not present.
+ */
+ private Node addEndNode(Node root, String instanceName, Map<String, String> pathNameMap,
+ int instanceWeight, List<String> liveInstances) {
+ Node current = root;
+ List<Node> pathNodes = new ArrayList<Node>();
+ for (String path : _types) {
+ String pathValue = pathNameMap.get(path);
+ if (pathValue == null || pathValue.isEmpty()) {
+ pathValue = _defaultDomainPathValues.get(path);
+ }
+ pathNodes.add(current);
+ if (!current.hasChild(pathValue)) {
+ Node n = new Node();
+ n.setName(pathValue);
+ n.setId(computeId(pathValue));
+ n.setType(path);
+ n.setParent(current);
+
+ // if it is leaf node.
+ if (path.equals(_endNodeType)) {
+ if (liveInstances.contains(instanceName)) {
+ // node is alive
+ n.setWeight(instanceWeight);
+ // add instance weight to all of its parent nodes.
+ for (Node node : pathNodes) {
+ node.addWeight(instanceWeight);
+ }
+ } else {
+ n.setFailed(true);
+ n.setWeight(0);
+ }
+ }
+ current.addChild(n);
+ }
+ current = current.getChild(pathValue);
+ }
+ return root;
+ }
+
+ private long computeId(String name) {
+ byte[] h = _md.digest(name.getBytes());
+ return bstrTo32bit(h);
+ }
+
+ private long bstrTo32bit(byte[] bstr) {
+ if (bstr.length < 4) {
+ throw new IllegalArgumentException("hashed is less than 4 bytes!");
+ }
+ // need to "simulate" unsigned int
+ return (long) (((ord(bstr[0]) << 24) | (ord(bstr[1]) << 16) | (ord(bstr[2]) << 8) | (ord(
+ bstr[3])))) & 0xffffffffL;
+ }
+
+ private int ord(byte b) {
+ return b & 0xff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index cb5bda8..dacf98d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.CurrentState;
@@ -56,6 +57,7 @@ public class ClusterDataCache {
private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
+ private ClusterConfig _clusterConfig;
Map<String, LiveInstance> _liveInstanceMap;
Map<String, LiveInstance> _liveInstanceCacheMap;
Map<String, IdealState> _idealStateMap;
@@ -200,11 +202,11 @@ public class ClusterDataCache {
_currentStateMap = Collections.unmodifiableMap(allCurStateMap);
_idealStateRuleMap = Maps.newHashMap();
- HelixProperty clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
- if (clusterConfig != null) {
- for (String simpleKey : clusterConfig.getRecord().getSimpleFields().keySet()) {
+ _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+ if (_clusterConfig != null) {
+ for (String simpleKey : _clusterConfig.getRecord().getSimpleFields().keySet()) {
if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
- String simpleValue = clusterConfig.getRecord().getSimpleField(simpleKey);
+ String simpleValue = _clusterConfig.getRecord().getSimpleField(simpleKey);
String[] rules = simpleValue.split("(?<!\\\\),");
Map<String, String> singleRule = Maps.newHashMap();
for (String rule : rules) {
@@ -232,6 +234,10 @@ public class ClusterDataCache {
return true;
}
+ public ClusterConfig getClusterConfig() {
+ return _clusterConfig;
+ }
+
/**
* Retrieves the idealstates for all resources
* @return
[2/5] helix git commit: [HELIX-568] Add new topology aware
(rack-aware) rebalance strategy based on CRUSH algorithm. Design doc is
available at:
https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy
Posted by lx...@apache.org.
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
deleted file mode 100644
index 959609f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ /dev/null
@@ -1,753 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.log4j.Logger;
-
-public class AutoRebalanceStrategy implements RebalanceStrategy {
- private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
- private final ReplicaPlacementScheme _placementScheme;
-
- private String _resourceName;
- private List<String> _partitions;
- private LinkedHashMap<String, Integer> _states;
- private int _maximumPerNode;
-
- private Map<String, Node> _nodeMap;
- private List<Node> _liveNodesList;
- private Map<Integer, String> _stateMap;
-
- private Map<Replica, Node> _preferredAssignment;
- private Map<Replica, Node> _existingPreferredAssignment;
- private Map<Replica, Node> _existingNonPreferredAssignment;
- private Set<Replica> _orphaned;
-
- public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode) {
- init(resourceName, partitions, states, maximumPerNode);
- _placementScheme = new DefaultPlacementScheme();
- }
-
- public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states) {
- this(resourceName, partitions, states, Integer.MAX_VALUE);
- }
-
- @Override
- public void init(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
- _maximumPerNode = maximumPerNode;
- }
-
- @Override
- public ZNRecord computePartitionAssignment(final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
- int numReplicas = countStateReplicas();
- ZNRecord znRecord = new ZNRecord(_resourceName);
- if (liveNodes.size() == 0) {
- return znRecord;
- }
- int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
- int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
- _nodeMap = new HashMap<String, Node>();
- _liveNodesList = new ArrayList<Node>();
-
- for (String id : allNodes) {
- Node node = new Node(id);
- node.capacity = 0;
- node.hasCeilingCapacity = false;
- _nodeMap.put(id, node);
- }
- for (int i = 0; i < liveNodes.size(); i++) {
- boolean usingCeiling = false;
- int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
- if (distRemainder > 0 && targetSize < _maximumPerNode) {
- targetSize += 1;
- distRemainder = distRemainder - 1;
- usingCeiling = true;
- }
- Node node = _nodeMap.get(liveNodes.get(i));
- node.isAlive = true;
- node.capacity = targetSize;
- node.hasCeilingCapacity = usingCeiling;
- _liveNodesList.add(node);
- }
-
- // compute states for all replica ids
- _stateMap = generateStateMap();
-
- // compute the preferred mapping if all nodes were up
- _preferredAssignment = computePreferredPlacement(allNodes);
-
- // logger.info("preferred mapping:"+ preferredAssignment);
- // from current mapping derive the ones in preferred location
- // this will update the nodes with their current fill status
- _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
-
- // from current mapping derive the ones not in preferred location
- _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
-
- // compute orphaned replicas that are not assigned to any node
- _orphaned = computeOrphaned();
- if (logger.isInfoEnabled()) {
- logger.info("orphan = " + _orphaned);
- }
-
- moveNonPreferredReplicasToPreferred();
-
- assignOrphans();
-
- moveExcessReplicas();
-
- prepareResult(znRecord);
- return znRecord;
- }
-
- /**
- * Move replicas assigned to non-preferred nodes if their current node is at capacity
- * and its preferred node is under capacity.
- */
- private void moveNonPreferredReplicasToPreferred() {
- // iterate through non preferred and see if we can move them to the
- // preferred location if the donor has more than it should and stealer has
- // enough capacity
- Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<Replica, Node> entry = iterator.next();
- Replica replica = entry.getKey();
- Node donor = entry.getValue();
- Node receiver = _preferredAssignment.get(replica);
- if (donor.capacity < donor.currentlyAssigned
- && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
- donor.currentlyAssigned = donor.currentlyAssigned - 1;
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- donor.nonPreferred.remove(replica);
- receiver.preferred.add(replica);
- donor.newReplicas.remove(replica);
- receiver.newReplicas.add(replica);
- iterator.remove();
- }
- }
- }
-
- /**
- * Slot in orphaned partitions randomly so as to maintain even load on live nodes.
- */
- private void assignOrphans() {
- // now iterate over nodes and remaining orphaned partitions and assign
- // partitions randomly
- // Better to iterate over orphaned partitions first
- Iterator<Replica> it = _orphaned.iterator();
- while (it.hasNext()) {
- Replica replica = it.next();
- boolean added = false;
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node receiver = _liveNodesList.get(index % _liveNodesList.size());
- if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- receiver.nonPreferred.add(replica);
- receiver.newReplicas.add(replica);
- added = true;
- break;
- }
- }
- if (!added) {
- // try adding the replica by making room for it
- added = assignOrphanByMakingRoom(replica);
- }
- if (added) {
- it.remove();
- }
- }
- if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
- logger.info("could not assign nodes to partitions: " + _orphaned);
- }
- }
-
- /**
- * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it
- * @param replica The replica to assign
- * @return true if the assignment succeeded, false otherwise
- */
- private boolean assignOrphanByMakingRoom(Replica replica) {
- Node capacityDonor = null;
- Node capacityAcceptor = null;
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node current = _liveNodesList.get(index % _liveNodesList.size());
- if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
- && !current.canAddIfCapacity(replica) && capacityDonor == null) {
- // this node has space but cannot accept the node
- capacityDonor = current;
- } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
- && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
- // this node would be able to accept the replica if it has ceiling capacity
- capacityAcceptor = current;
- }
- if (capacityDonor != null && capacityAcceptor != null) {
- break;
- }
- }
- if (capacityDonor != null && capacityAcceptor != null) {
- // transfer ceiling capacity and add the node
- capacityAcceptor.steal(capacityDonor, replica);
- return true;
- }
- return false;
- }
-
- /**
- * Move replicas from too-full nodes to nodes that can accept the replicas
- */
- private void moveExcessReplicas() {
- // iterate over nodes and move extra load
- Iterator<Replica> it;
- for (Node donor : _liveNodesList) {
- if (donor.capacity < donor.currentlyAssigned) {
- Collections.sort(donor.nonPreferred);
- it = donor.nonPreferred.iterator();
- while (it.hasNext()) {
- Replica replica = it.next();
- int startIndex = computeRandomStartIndex(replica);
- for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) {
- Node receiver = _liveNodesList.get(index % _liveNodesList.size());
- if (receiver.canAdd(replica)) {
- receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
- receiver.nonPreferred.add(replica);
- donor.currentlyAssigned = donor.currentlyAssigned - 1;
- it.remove();
- break;
- }
- }
- if (donor.capacity >= donor.currentlyAssigned) {
- break;
- }
- }
- if (donor.capacity < donor.currentlyAssigned) {
- logger.warn("Could not take partitions out of node:" + donor.id);
- }
- }
- }
- }
-
- /**
- * Update a ZNRecord with the results of the rebalancing.
- * @param znRecord
- */
- private void prepareResult(ZNRecord znRecord) {
- // The map fields are keyed on partition name to a pair of node and state, i.e. it
- // indicates that the partition with given state is served by that node
- //
- // The list fields are also keyed on partition and list all the nodes serving that partition.
- // This is useful to verify that there is no node serving multiple replicas of the same
- // partition.
- Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
- for (String partition : _partitions) {
- znRecord.setMapField(partition, new TreeMap<String, String>());
- znRecord.setListField(partition, new ArrayList<String>());
- newPreferences.put(partition, new ArrayList<String>());
- }
-
- // for preference lists, the rough priority that we want is:
- // [existing preferred, existing non-preferred, non-existing preferred, non-existing
- // non-preferred]
- for (Node node : _liveNodesList) {
- for (Replica replica : node.preferred) {
- if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
- } else {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- }
- for (Node node : _liveNodesList) {
- for (Replica replica : node.nonPreferred) {
- if (node.newReplicas.contains(replica)) {
- newPreferences.get(replica.partition).add(node.id);
- } else {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- }
- normalizePreferenceLists(znRecord.getListFields(), newPreferences);
-
- // generate preference maps based on the preference lists
- for (String partition : _partitions) {
- List<String> preferenceList = znRecord.getListField(partition);
- int i = 0;
- for (String participant : preferenceList) {
- znRecord.getMapField(partition).put(participant, _stateMap.get(i));
- i++;
- }
- }
- }
-
- /**
- * Adjust preference lists to reduce the number of same replicas on an instance. This will
- * separately normalize two sets of preference lists, and then append the results of the second
- * set to those of the first. This basically ensures that existing replicas are automatically
- * preferred.
- * @param preferenceLists map of (partition --> list of nodes)
- * @param newPreferences map containing node preferences not consistent with the current
- * assignment
- */
- private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
- Map<String, List<String>> newPreferences) {
-
- Map<String, Map<String, Integer>> nodeReplicaCounts =
- new HashMap<String, Map<String, Integer>>();
- for (String partition : preferenceLists.keySet()) {
- normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
- }
- for (String partition : newPreferences.keySet()) {
- normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
- preferenceLists.get(partition).addAll(newPreferences.get(partition));
- }
- }
-
- /**
- * Adjust a single preference list for replica assignment imbalance
- * @param preferenceList list of node names
- * @param nodeReplicaCounts map of (node --> state --> count)
- */
- private void normalizePreferenceList(List<String> preferenceList,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- List<String> newPreferenceList = new ArrayList<String>();
- int replicas = Math.min(countStateReplicas(), preferenceList.size());
-
- // make this a LinkedHashSet to preserve iteration order
- Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
- for (int i = 0; i < replicas; i++) {
- String state = _stateMap.get(i);
- String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
- newPreferenceList.add(node);
- notAssigned.remove(node);
- Map<String, Integer> counts = nodeReplicaCounts.get(node);
- counts.put(state, counts.get(state) + 1);
- }
- preferenceList.clear();
- preferenceList.addAll(newPreferenceList);
- }
-
- /**
- * Get the node which hosts the fewest of a given replica
- * @param state the state
- * @param nodes nodes to check
- * @param nodeReplicaCounts current assignment of replicas
- * @return the node most willing to accept the replica
- */
- private String getMinimumNodeForReplica(String state, Set<String> nodes,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- String minimalNode = null;
- int minimalCount = Integer.MAX_VALUE;
- for (String node : nodes) {
- int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
- if (count < minimalCount) {
- minimalCount = count;
- minimalNode = node;
- }
- }
- return minimalNode;
- }
-
- /**
- * Safe check for the number of replicas of a given id assiged to a node
- * @param state the state to assign
- * @param node the node to check
- * @param nodeReplicaCounts a map of node to replica id and counts
- * @return the number of currently assigned replicas of the given id
- */
- private int getReplicaCountForNode(String state, String node,
- Map<String, Map<String, Integer>> nodeReplicaCounts) {
- if (!nodeReplicaCounts.containsKey(node)) {
- Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
- replicaCounts.put(state, 0);
- nodeReplicaCounts.put(node, replicaCounts);
- return 0;
- }
- Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
- if (!replicaCounts.containsKey(state)) {
- replicaCounts.put(state, 0);
- return 0;
- }
- return replicaCounts.get(state);
- }
-
- /**
- * Compute the subset of the current mapping where replicas are not mapped according to their
- * preferred assignment.
- * @param currentMapping Current mapping of replicas to nodes
- * @return The current assignments that do not conform to the preferred assignment
- */
- private Map<Replica, Node> computeExistingNonPreferredPlacement(
- Map<String, Map<String, String>> currentMapping) {
- Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
- int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
- for (String nodeId : nodeStateMap.keySet()) {
- Node node = _nodeMap.get(nodeId);
- boolean skip = false;
- for (Replica replica : node.preferred) {
- if (replica.partition.equals(partition)) {
- skip = true;
- break;
- }
- }
- if (skip) {
- continue;
- }
- // check if its in one of the preferred position
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- if (!_preferredAssignment.containsKey(replica)) {
-
- logger.info("partitions: " + _partitions);
- logger.info("currentMapping.keySet: " + currentMapping.keySet());
- throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
- }
-
- if (_preferredAssignment.get(replica).id != node.id
- && !_existingPreferredAssignment.containsKey(replica)
- && !existingNonPreferredAssignment.containsKey(replica)) {
- existingNonPreferredAssignment.put(replica, node);
- node.nonPreferred.add(replica);
-
- break;
- }
- }
- }
- }
- return existingNonPreferredAssignment;
- }
-
- /**
- * Get a live node index to try first for a replica so that each possible start index is
- * roughly uniformly assigned.
- * @param replica The replica to assign
- * @return The starting node index to try
- */
- private int computeRandomStartIndex(final Replica replica) {
- return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
- }
-
- /**
- * Get a set of replicas not currently assigned to any node
- * @return Unassigned replicas
- */
- private Set<Replica> computeOrphaned() {
- Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet());
- for (Replica r : _existingPreferredAssignment.keySet()) {
- if (orphanedPartitions.contains(r)) {
- orphanedPartitions.remove(r);
- }
- }
- for (Replica r : _existingNonPreferredAssignment.keySet()) {
- if (orphanedPartitions.contains(r)) {
- orphanedPartitions.remove(r);
- }
- }
-
- return orphanedPartitions;
- }
-
- /**
- * Determine the replicas already assigned to their preferred nodes
- * @param currentMapping Current assignment of replicas to nodes
- * @return Assignments that conform to the preferred placement
- */
- private Map<Replica, Node> computeExistingPreferredPlacement(
- final Map<String, Map<String, String>> currentMapping) {
- Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
- int count = countStateReplicas();
- for (String partition : currentMapping.keySet()) {
- Map<String, String> nodeStateMap = currentMapping.get(partition);
- nodeStateMap.keySet().retainAll(_nodeMap.keySet());
- for (String nodeId : nodeStateMap.keySet()) {
- Node node = _nodeMap.get(nodeId);
- node.currentlyAssigned = node.currentlyAssigned + 1;
- // check if its in one of the preferred position
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- if (_preferredAssignment.containsKey(replica)
- && !existingPreferredAssignment.containsKey(replica)
- && _preferredAssignment.get(replica).id == node.id) {
- existingPreferredAssignment.put(replica, node);
- node.preferred.add(replica);
- break;
- }
- }
- }
- }
-
- return existingPreferredAssignment;
- }
-
- /**
- * Given a predefined set of all possible nodes, compute an assignment of replicas to
- * nodes that evenly assigns all replicas to nodes.
- * @param allNodes Identifiers to all nodes, live and non-live
- * @return Preferred assignment of replicas
- */
- private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
- Map<Replica, Node> preferredMapping;
- preferredMapping = new HashMap<Replica, Node>();
- int partitionId = 0;
- int numReplicas = countStateReplicas();
- int count = countStateReplicas();
- for (String partition : _partitions) {
- for (int replicaId = 0; replicaId < count; replicaId++) {
- Replica replica = new Replica(partition, replicaId);
- String nodeName =
- _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
- allNodes);
- preferredMapping.put(replica, _nodeMap.get(nodeName));
- }
- partitionId = partitionId + 1;
- }
- return preferredMapping;
- }
-
- /**
- * Counts the total number of replicas given a state-count mapping
- * @return
- */
- private int countStateReplicas() {
- int total = 0;
- for (Integer count : _states.values()) {
- total += count;
- }
- return total;
- }
-
- /**
- * Compute a map of replica ids to state names
- * @return Map: replica id -> state name
- */
- private Map<Integer, String> generateStateMap() {
- int replicaId = 0;
- Map<Integer, String> stateMap = new HashMap<Integer, String>();
- for (String state : _states.keySet()) {
- Integer count = _states.get(state);
- for (int i = 0; i < count; i++) {
- stateMap.put(replicaId, state);
- replicaId++;
- }
- }
- return stateMap;
- }
-
- /**
- * A Node is an entity that can serve replicas. It has a capacity and knowledge
- * of replicas assigned to it, so it can decide if it can receive additional replicas.
- */
- class Node {
- public int currentlyAssigned;
- public int capacity;
- public boolean hasCeilingCapacity;
- private final String id;
- boolean isAlive;
- private final List<Replica> preferred;
- private final List<Replica> nonPreferred;
- private final Set<Replica> newReplicas;
-
- public Node(String id) {
- preferred = new ArrayList<Replica>();
- nonPreferred = new ArrayList<Replica>();
- newReplicas = new TreeSet<Replica>();
- currentlyAssigned = 0;
- isAlive = false;
- this.id = id;
- }
-
- /**
- * Check if this replica can be legally added to this node
- * @param replica The replica to test
- * @return true if the assignment can be made, false otherwise
- */
- public boolean canAdd(Replica replica) {
- if (currentlyAssigned >= capacity) {
- return false;
- }
- return canAddIfCapacity(replica);
- }
-
- /**
- * Check if this replica can be legally added to this node, provided that it has enough
- * capacity.
- * @param replica The replica to test
- * @return true if the assignment can be made, false otherwise
- */
- public boolean canAddIfCapacity(Replica replica) {
- if (!isAlive) {
- return false;
- }
- for (Replica r : preferred) {
- if (r.partition.equals(replica.partition)) {
- return false;
- }
- }
- for (Replica r : nonPreferred) {
- if (r.partition.equals(replica.partition)) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Receive a replica by stealing capacity from another Node
- * @param donor The node that has excess capacity
- * @param replica The replica to receive
- */
- public void steal(Node donor, Replica replica) {
- donor.hasCeilingCapacity = false;
- donor.capacity--;
- hasCeilingCapacity = true;
- capacity++;
- currentlyAssigned++;
- nonPreferred.add(replica);
- newReplicas.add(replica);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
- .append("\nnonpreferred:").append(nonPreferred.size());
- return sb.toString();
- }
- }
-
- /**
- * A Replica is a combination of a partition of the resource, the state the replica is in
- * and an identifier signifying a specific replica of a given partition and state.
- */
- class Replica implements Comparable<Replica> {
- private String partition;
- private int replicaId; // this is a partition-relative id
- private String format;
-
- public Replica(String partition, int replicaId) {
- this.partition = partition;
- this.replicaId = replicaId;
- this.format = this.partition + "|" + this.replicaId;
- }
-
- @Override
- public String toString() {
- return format;
- }
-
- @Override
- public boolean equals(Object that) {
- if (that instanceof Replica) {
- return this.format.equals(((Replica) that).format);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return this.format.hashCode();
- }
-
- @Override
- public int compareTo(Replica that) {
- if (that instanceof Replica) {
- return this.format.compareTo(that.format);
- }
- return -1;
- }
- }
-
- /**
- * Interface for providing a custom approach to computing a replica's affinity to a node.
- */
- public interface ReplicaPlacementScheme {
- /**
- * Initialize global state
- * @param manager The instance to which this placement is associated
- */
- public void init(final HelixManager manager);
-
- /**
- * Given properties of this replica, determine the node it would prefer to be served by
- * @param partitionId The current partition
- * @param replicaId The current replica with respect to the current partition
- * @param numPartitions The total number of partitions
- * @param numReplicas The total number of replicas per partition
- * @param nodeNames A list of identifiers of all nodes, live and non-live
- * @return The name of the node that would prefer to serve this replica
- */
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
- final List<String> nodeNames);
- }
-
- /**
- * Compute preferred placements based on a default strategy that assigns replicas to nodes as
- * evenly as possible while avoiding placing two replicas of the same partition on any node.
- */
- public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
- @Override
- public void init(final HelixManager manager) {
- // do nothing since this is independent of the manager
- }
-
- @Override
- public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas,
- final List<String> nodeNames) {
- int index;
- if (nodeNames.size() > numPartitions) {
- // assign replicas in partition order in case there are more nodes than partitions
- index = (partitionId + replicaId * numPartitions) % nodeNames.size();
- } else if (nodeNames.size() == numPartitions) {
- // need a replica offset in case the sizes of these sets are the same
- index =
- ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
- % nodeNames.size();
- } else {
- // in all other cases, assigning a replica at a time for each partition is reasonable
- index = (partitionId + replicaId) % nodeNames.size();
- }
- return nodeNames.get(index);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
deleted file mode 100644
index 4daae82..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.helix.controller.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.ZNRecord;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Assignment strategy interface that computes the assignment of partition->instance.
- */
-public interface RebalanceStrategy {
- /**
- * Perform the necessary initialization for the rebalance strategy object.
- * @param resourceName
- * @param partitions
- * @param states
- * @param maximumPerNode
- */
- void init(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode);
-
- /**
- * Compute the preference lists and (optional partition-state mapping) for the given resource.
- *
- * @param liveNodes
- * @param currentMapping
- * @param allNodes
- * @return
- */
- ZNRecord computePartitionAssignment(final List<String> liveNodes,
- final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index e97ac9b..73f2cbb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
@@ -607,6 +608,13 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void addResource(String clusterName, String resourceName, int partitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+ rebalanceStrategy, 0, -1);
+ }
+
+ @Override
public void addResource(String clusterName, String resourceName, IdealState idealstate) {
String stateModelRef = idealstate.getStateModelDefRef();
String stateModelDefPath =
@@ -629,14 +637,21 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void addResource(String clusterName, String resourceName, int partitions,
String stateModelRef, String rebalancerMode, int bucketSize) {
- addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize,
- -1);
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1);
}
@Override
public void addResource(String clusterName, String resourceName, int partitions,
String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) {
+ addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode,
+ RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance);
+ }
+
+ @Override
+ public void addResource(String clusterName, String resourceName, int partitions,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize,
+ int maxPartitionsPerInstance) {
if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
throw new HelixException("cluster " + clusterName + " is not setup yet");
}
@@ -647,6 +662,7 @@ public class ZKHelixAdmin implements HelixAdmin {
RebalanceMode mode =
idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO);
idealState.setRebalanceMode(mode);
+ idealState.setRebalanceStrategy(rebalanceStrategy);
idealState.setReplicas("" + 0);
idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) {
@@ -1014,8 +1030,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public ZNRecord update(ZNRecord currentData) {
ClusterConstraints constraints =
- currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(
- currentData);
+ currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
@@ -1153,6 +1168,26 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
+ public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) {
+ if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
+ throw new HelixException("cluster " + clusterName + " is not setup yet");
+ }
+
+ if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
+ throw new HelixException("cluster " + clusterName + " instance " + instanceName
+ + " is not setup yet");
+ }
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ PropertyKey configKey = keyBuilder.instanceConfig(instanceName);
+ InstanceConfig config = accessor.getProperty(configKey);
+ config.setZoneId(zoneId);
+ accessor.setProperty(configKey, config);
+ }
+
+ @Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
new file mode 100644
index 0000000..25a16d1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -0,0 +1,92 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+/**
+ * Cluster configurations
+ */
+public class ClusterConfig extends HelixProperty {
+ /**
+ * Configurable characteristics of a cluster
+ */
+ public enum ClusterConfigProperty {
+ HELIX_DISABLE_PIPELINE_TRIGGERS,
+ TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance"
+ FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition.
+ }
+
+ /**
+ * Instantiate for a specific cluster
+ *
+ * @param cluster the cluster identifier
+ */
+ public ClusterConfig(String cluster) {
+ super(cluster);
+ }
+
+ /**
+ * Instantiate with a pre-populated record
+ *
+ * @param record a ZNRecord corresponding to a cluster configuration
+ */
+ public ClusterConfig(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Whether to persist best possible assignment in a resource's idealstate.
+ *
+ * @return
+ */
+ public Boolean isPipelineTriggersDisabled() {
+ return _record
+ .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ClusterConfig) {
+ ClusterConfig that = (ClusterConfig) obj;
+
+ if (this.getId().equals(that.getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ /**
+ * Get the name of this resource
+ *
+ * @return the instance name
+ */
+ public String getClusterName() {
+ return _record.getId();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 7c4cf54..55d4734 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -167,7 +167,7 @@ public class IdealState extends HelixProperty {
/**
* Specify the strategy for Helix to use to compute the partition-instance assignment,
- * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+ * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy}
*
* @param rebalanceStrategy
* @return
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index eb1c652..ecf2900 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty {
public enum InstanceConfigProperty {
HELIX_HOST,
HELIX_PORT,
+ HELIX_ZONE_ID,
HELIX_ENABLED,
HELIX_DISABLED_PARTITION,
- TAG_LIST
+ TAG_LIST,
+ INSTANCE_WEIGHT,
+ DOMAIN
}
+ public static final int WEIGHT_NOT_SET = -1;
private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName());
@@ -94,6 +98,50 @@ public class InstanceConfig extends HelixProperty {
_record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port);
}
+ public String getZoneId() {
+ return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name());
+ }
+
+ public void setZoneId(String zoneId) {
+ _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId);
+ }
+
+ /**
+ * Domain represents a hierarchy identifier for an instance.
+ * @return
+ */
+ public String getDomain() {
+ return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name());
+ }
+
+ /**
+ * Domain represents a hierarchy identifier for an instance.
+ * Example: "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001".
+ * @return
+ */
+ public void setDomain(String domain) {
+ _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain);
+ }
+
+ public int getWeight() {
+ String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name());
+ if (w != null) {
+ try {
+ int weight = Integer.valueOf(w);
+ return weight;
+ } catch (NumberFormatException e) {
+ }
+ }
+ return WEIGHT_NOT_SET;
+ }
+
+ public void setWeight(int weight) {
+ if (weight <= 0) {
+ throw new IllegalArgumentException("Instance weight can not be equal or less than 0!");
+ }
+ _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight));
+ }
+
/**
* Get arbitrary tags associated with the instance
* @return a list of tags
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index 623357f..ac96768 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -32,8 +32,8 @@ import java.util.TreeSet;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
@@ -126,7 +126,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
List<String> allNodes =
Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
Collections.sort(allNodes);
- ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes);
+ ZNRecord record =
+ strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache);
Map<String, List<String>> preferenceLists = record.getListFields();
// Convert to an assignment keyed on participant
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 9d411bb..08ccbdc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -362,6 +362,12 @@ public class ClusterSetup {
}
public void addResourceToCluster(String clusterName, String resourceName, int numResources,
+ String stateModelRef, String rebalancerMode, String rebalanceStrategy) {
+ _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
+ rebalanceStrategy);
+ }
+
+ public void addResourceToCluster(String clusterName, String resourceName, int numResources,
String stateModelRef, String rebalancerMode, int bucketSize) {
_admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode,
bucketSize);
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
new file mode 100644
index 0000000..a4e38a1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java
@@ -0,0 +1,766 @@
+package org.apache.helix.controller.Strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.Mocks.MockAccessor;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TestAutoRebalanceStrategy {
+ private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
+
+ /**
+ * Sanity test for a basic Master-Slave model
+ */
+ @Test
+ public void simpleMasterSlaveTest() {
+ final int NUM_ITERATIONS = 10;
+ final int NUM_PARTITIONS = 10;
+ final int NUM_LIVE_NODES = 12;
+ final int NUM_TOTAL_NODES = 20;
+ final int MAX_PER_NODE = 5;
+
+ final String[] STATE_NAMES = {
+ "MASTER", "SLAVE"
+ };
+ final int[] STATE_COUNTS = {
+ 1, 2
+ };
+
+ runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES,
+ MAX_PER_NODE, STATE_NAMES, STATE_COUNTS);
+ }
+
+ /**
+ * Run a test for an arbitrary state model.
+ * @param name Name of the test state model
+ * @param numIterations Number of rebalance tasks to run
+ * @param numPartitions Number of partitions for the resource
+ * @param numLiveNodes Number of live nodes in the cluster
+ * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to
+ * numLiveNodes
+ * @param maxPerNode Maximum number of replicas a node can serve
+ * @param stateNames States ordered by preference
+ * @param stateCounts Number of replicas that should be in each state
+ */
+ private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes,
+ int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) {
+ List<String> partitions = new ArrayList<String>();
+ for (int i = 0; i < numPartitions; i++) {
+ partitions.add("p_" + i);
+ }
+
+ List<String> liveNodes = new ArrayList<String>();
+ List<String> allNodes = new ArrayList<String>();
+ for (int i = 0; i < numTotalNodes; i++) {
+ allNodes.add("n_" + i);
+ if (i < numLiveNodes) {
+ liveNodes.add("n_" + i);
+ }
+ }
+
+ Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>();
+
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
+ for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) {
+ states.put(stateNames[i], stateCounts[i]);
+ }
+
+ StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
+
+ new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
+ stateModelDef).runRepeatedly(numIterations);
+ }
+
+ /**
+ * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions
+ * into account when computing mappings, so this is acceptable.
+ * @param modelName name to give the model
+ * @param initialState initial state for all nodes
+ * @param states ordered map of state to count
+ * @return incomplete StateModelDefinition for rebalancing
+ */
+ private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
+ LinkedHashMap<String, Integer> states) {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
+ builder.initialState(initialState);
+ int i = states.size();
+ for (String state : states.keySet()) {
+ builder.addState(state, i);
+ builder.upperBound(state, states.get(state));
+ i--;
+ }
+ return builder.build();
+ }
+
+ class AutoRebalanceTester {
+ private static final double P_KILL = 0.45;
+ private static final double P_ADD = 0.1;
+ private static final double P_RESURRECT = 0.45;
+ private static final String RESOURCE_NAME = "resource";
+
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private List<String> _liveNodes;
+ private Set<String> _liveSet;
+ private Set<String> _removedSet;
+ private Set<String> _nonLiveSet;
+ private Map<String, Map<String, String>> _currentMapping;
+ private List<String> _allNodes;
+ private int _maxPerNode;
+ private StateModelDefinition _stateModelDef;
+ private Random _random;
+
+ public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
+ List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
+ List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
+ _partitions = partitions;
+ _states = states;
+ _liveNodes = liveNodes;
+ _liveSet = new TreeSet<String>();
+ for (String node : _liveNodes) {
+ _liveSet.add(node);
+ }
+ _removedSet = new TreeSet<String>();
+ _nonLiveSet = new TreeSet<String>();
+ _currentMapping = currentMapping;
+ _allNodes = allNodes;
+ for (String node : allNodes) {
+ if (!_liveSet.contains(node)) {
+ _nonLiveSet.add(node);
+ }
+ }
+ _maxPerNode = maxPerNode;
+ _stateModelDef = stateModelDef;
+ _random = new Random();
+ }
+
+ /**
+ * Repeatedly randomly select a task to run and report the result
+ * @param numIterations
+ * Number of random tasks to run in sequence
+ */
+ public void runRepeatedly(int numIterations) {
+ logger.info("~~~~ Initial State ~~~~~");
+ RebalanceStrategy strategy =
+ new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
+ ZNRecord initialResult =
+ strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ _currentMapping = getMapping(initialResult.getListFields());
+ logger.info(_currentMapping);
+ getRunResult(_currentMapping, initialResult.getListFields());
+ for (int i = 0; i < numIterations; i++) {
+ logger.info("~~~~ Iteration " + i + " ~~~~~");
+ ZNRecord znRecord = runOnceRandomly();
+ if (znRecord != null) {
+ final Map<String, List<String>> listResult = znRecord.getListFields();
+ final Map<String, Map<String, String>> mapResult = getMapping(listResult);
+ logger.info(mapResult);
+ logger.info(listResult);
+ getRunResult(mapResult, listResult);
+ _currentMapping = mapResult;
+ }
+ }
+ }
+
+ private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) {
+ final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>();
+ ClusterDataCache cache = new ClusterDataCache();
+ MockAccessor accessor = new MockAccessor();
+ Builder keyBuilder = accessor.keyBuilder();
+ for (String node : _liveNodes) {
+ LiveInstance liveInstance = new LiveInstance(node);
+ liveInstance.setSessionId("testSession");
+ accessor.setProperty(keyBuilder.liveInstance(node), liveInstance);
+ }
+ cache.refresh(accessor);
+ for (String partition : _partitions) {
+ List<String> preferenceList = listResult.get(partition);
+ Map<String, String> currentStateMap = _currentMapping.get(partition);
+ Set<String> disabled = Collections.emptySet();
+ Map<String, String> assignment =
+ ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef,
+ preferenceList, currentStateMap, disabled, true);
+ mapResult.put(partition, assignment);
+ }
+ return mapResult;
+ }
+
+ /**
+ * Output various statistics and correctness check results
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ * @param listFields
+ * The map-list assignment generated by the rebalancer
+ */
+ public void getRunResult(final Map<String, Map<String, String>> mapFields,
+ final Map<String, List<String>> listFields) {
+ logger.info("***** Statistics *****");
+ dumpStatistics(mapFields);
+ verifyCorrectness(mapFields, listFields);
+ }
+
+ /**
+ * Output statistics about the assignment
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ */
+ public void dumpStatistics(final Map<String, Map<String, String>> mapFields) {
+ Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+ int nodeCount = _liveNodes.size();
+ logger.info("Total number of nodes: " + nodeCount);
+ logger.info("Nodes: " + _liveNodes);
+ int sumPartitions = getSum(partitionsPerNode.values());
+ logger.info("Total number of partitions: " + sumPartitions);
+ double averagePartitions = getAverage(partitionsPerNode.values());
+ logger.info("Average number of partitions per node: " + averagePartitions);
+ double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions);
+ logger.info("Standard deviation of partitions: " + stdevPartitions);
+
+ // Statistics about each state
+ Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields);
+ for (String state : _states.keySet()) {
+ Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>();
+ for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) {
+ Map<String, Integer> stateCounts = nodeStates.getValue();
+ if (stateCounts.containsKey(state)) {
+ nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state));
+ } else {
+ nodeStateCounts.put(nodeStates.getKey(), 0);
+ }
+ }
+ int sumStates = getSum(nodeStateCounts.values());
+ logger.info("Total number of state " + state + ": " + sumStates);
+ double averageStates = getAverage(nodeStateCounts.values());
+ logger.info("Average number of state " + state + " per node: " + averageStates);
+ double stdevStates = getStdev(nodeStateCounts.values(), averageStates);
+ logger.info("Standard deviation of state " + state + " per node: " + stdevStates);
+ }
+ }
+
+ /**
+ * Run a set of correctness tests, reporting success or failure
+ * @param mapFields
+ * The map-map assignment generated by the rebalancer
+ * @param listFields
+ * The map-list assignment generated by the rebalancer
+ */
+ public void verifyCorrectness(final Map<String, Map<String, String>> mapFields,
+ final Map<String, List<String>> listFields) {
+ final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields);
+ boolean maxConstraintMet = maxNotExceeded(partitionsPerNode);
+ assert maxConstraintMet : "Max per node constraint: FAIL";
+ logger.info("Max per node constraint: PASS");
+
+ boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode);
+ assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL";
+ logger.info("Only live nodes have partitions constraint: PASS");
+
+ boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields);
+ assert stateAssignmentPossible : "State replica constraint: FAIL";
+ logger.info("State replica constraint: PASS");
+
+ boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields);
+ assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL";
+ logger.info("Node uniqueness per partition constraint: PASS");
+ }
+
+ private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) {
+ for (String node : partitionsPerNode.keySet()) {
+ Integer value = partitionsPerNode.get(node);
+ if (value > _maxPerNode) {
+ logger.error("ERROR: Node " + node + " has " + value
+ + " partitions despite a maximum of " + _maxPerNode);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) {
+ for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) {
+ boolean isLive = _liveSet.contains(nodeState.getKey());
+ boolean isEmpty = nodeState.getValue() == 0;
+ if (!isLive && !isEmpty) {
+ logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has "
+ + nodeState.getValue() + " replicas!");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) {
+ for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+ final Map<String, String> nodeMap = partitionEntry.getValue();
+ final Map<String, Integer> stateCounts = new TreeMap<String, Integer>();
+ for (String state : nodeMap.values()) {
+ if (!stateCounts.containsKey(state)) {
+ stateCounts.put(state, 1);
+ } else {
+ stateCounts.put(state, stateCounts.get(state) + 1);
+ }
+ }
+ for (String state : stateCounts.keySet()) {
+ if (state.equals(HelixDefinedState.DROPPED.toString())) {
+ continue;
+ }
+ int count = stateCounts.get(state);
+ int maximumCount = _states.get(state);
+ if (count > maximumCount) {
+ logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey()
+ + " has " + count + " replicas when " + maximumCount + " is allowed!");
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) {
+ for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) {
+ Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue());
+ int numUniques = nodeSet.size();
+ int total = partitionEntry.getValue().size();
+ if (numUniques < total) {
+ logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total
+ + " nodes, but only " + numUniques + " are unique!");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private double getAverage(final Collection<Integer> values) {
+ double sum = 0.0;
+ for (Integer value : values) {
+ sum += value;
+ }
+ if (values.size() != 0) {
+ return sum / values.size();
+ } else {
+ return -1.0;
+ }
+ }
+
+ private int getSum(final Collection<Integer> values) {
+ int sum = 0;
+ for (Integer value : values) {
+ sum += value;
+ }
+ return sum;
+ }
+
+ private double getStdev(final Collection<Integer> values, double mean) {
+ double sum = 0.0;
+ for (Integer value : values) {
+ double deviation = mean - value;
+ sum += Math.pow(deviation, 2.0);
+ }
+ if (values.size() != 0) {
+ sum /= values.size();
+ return Math.pow(sum, 0.5);
+ } else {
+ return -1.0;
+ }
+ }
+
+ private Map<String, Integer> getPartitionBucketsForNode(
+ final Map<String, Map<String, String>> assignment) {
+ Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>();
+ for (String node : _liveNodes) {
+ partitionsPerNode.put(node, 0);
+ }
+ for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) {
+ final Map<String, String> nodeMap = partitionEntry.getValue();
+ for (String node : nodeMap.keySet()) {
+ String state = nodeMap.get(node);
+ if (state.equals(HelixDefinedState.DROPPED.toString())) {
+ continue;
+ }
+ // add 1 for every occurrence of a node
+ if (!partitionsPerNode.containsKey(node)) {
+ partitionsPerNode.put(node, 1);
+ } else {
+ partitionsPerNode.put(node, partitionsPerNode.get(node) + 1);
+ }
+ }
+ }
+ return partitionsPerNode;
+ }
+
+ private Map<String, Map<String, Integer>> getStateBucketsForNode(
+ final Map<String, Map<String, String>> assignment) {
+ Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>();
+ for (String n : _liveNodes) {
+ result.put(n, new TreeMap<String, Integer>());
+ }
+ for (Map<String, String> nodeStateMap : assignment.values()) {
+ for (Entry<String, String> nodeState : nodeStateMap.entrySet()) {
+ if (!result.containsKey(nodeState.getKey())) {
+ result.put(nodeState.getKey(), new TreeMap<String, Integer>());
+ }
+ Map<String, Integer> stateMap = result.get(nodeState.getKey());
+ if (!stateMap.containsKey(nodeState.getValue())) {
+ stateMap.put(nodeState.getValue(), 1);
+ } else {
+ stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Randomly choose between killing, adding, or resurrecting a single node
+ * @return (Partition -> (Node -> State)) ZNRecord
+ */
+ public ZNRecord runOnceRandomly() {
+ double choose = _random.nextDouble();
+ ZNRecord result = null;
+ if (choose < P_KILL) {
+ result = removeSingleNode(null);
+ } else if (choose < P_KILL + P_ADD) {
+ result = addSingleNode(null);
+ } else if (choose < P_KILL + P_ADD + P_RESURRECT) {
+ result = resurrectSingleNode(null);
+ }
+ return result;
+ }
+
+ /**
+ * Run rebalancer trying to add a never-live node
+ * @param node
+ * Optional String to add
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord addSingleNode(String node) {
+ logger.info("=================== add node =================");
+ if (_nonLiveSet.size() == 0) {
+ logger.warn("Cannot add node because there are no nodes left to add.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_nonLiveSet.contains(node)) {
+ node = getRandomSetElement(_nonLiveSet);
+ }
+ logger.info("Adding " + node);
+ _liveNodes.add(node);
+ _liveSet.add(node);
+ _nonLiveSet.remove(node);
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+ computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ /**
+ * Run rebalancer trying to remove a live node
+ * @param node
+ * Optional String to remove
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord removeSingleNode(String node) {
+ logger.info("=================== remove node =================");
+ if (_liveSet.size() == 0) {
+ logger.warn("Cannot remove node because there are no nodes left to remove.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_liveSet.contains(node)) {
+ node = getRandomSetElement(_liveSet);
+ }
+ logger.info("Removing " + node);
+ _removedSet.add(node);
+ _liveNodes.remove(node);
+ _liveSet.remove(node);
+
+ // the rebalancer expects that the current mapping doesn't contain deleted
+ // nodes
+ for (Map<String, String> nodeMap : _currentMapping.values()) {
+ if (nodeMap.containsKey(node)) {
+ nodeMap.remove(node);
+ }
+ }
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ /**
+ * Run rebalancer trying to add back a removed node
+ * @param node
+ * Optional String to resurrect
+ * @return ZNRecord result returned by the rebalancer
+ */
+ public ZNRecord resurrectSingleNode(String node) {
+ logger.info("=================== resurrect node =================");
+ if (_removedSet.size() == 0) {
+ logger.warn("Cannot remove node because there are no nodes left to resurrect.");
+ return null;
+ }
+
+ // Get a random never-live node
+ if (node == null || !_removedSet.contains(node)) {
+ node = getRandomSetElement(_removedSet);
+ }
+ logger.info("Resurrecting " + node);
+ _removedSet.remove(node);
+ _liveNodes.add(node);
+ _liveSet.add(node);
+
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null);
+ }
+
+ private <T> T getRandomSetElement(Set<T> source) {
+ int element = _random.nextInt(source.size());
+ int i = 0;
+ for (T node : source) {
+ if (i == element) {
+ return node;
+ }
+ i++;
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
+ * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
+ * current mapping, then it should distribute states as evenly as possible.
+ */
+ @Test
+ public void testOrphansNotPreferred() {
+ final String RESOURCE_NAME = "resource";
+ final String[] PARTITIONS = {
+ "resource_0", "resource_1", "resource_2"
+ };
+ final StateModelDefinition STATE_MODEL =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ final int REPLICA_COUNT = 2;
+ final String[] NODES = {
+ "n0", "n1", "n2"
+ };
+
+ // initial state, one node, no mapping
+ List<String> allNodes = Lists.newArrayList(NODES[0]);
+ List<String> liveNodes = Lists.newArrayList(NODES[0]);
+ Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+ for (String partition : PARTITIONS) {
+ currentMapping.put(partition, new HashMap<String, String>());
+ }
+
+ // make sure that when the first node joins, a single replica is assigned fairly
+ List<String> partitions = ImmutableList.copyOf(PARTITIONS);
+ LinkedHashMap<String, Integer> stateCount =
+ AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ ZNRecord znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ Map<String, List<String>> preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ // make sure these are all MASTER
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+ }
+
+ // now assign a replica to the first node in the current mapping, and add a second node
+ allNodes.add(NODES[1]);
+ liveNodes.add(NODES[1]);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[0], "MASTER");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
+ + partition);
+ Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
+ + partition);
+ }
+
+ // now set the current mapping to reflect this update and make sure that it distributes masters
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[1], "SLAVE");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ Set<String> firstNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+
+ // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
+ // new node is never the most preferred
+ allNodes.add(NODES[2]);
+ liveNodes.add(NODES[2]);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
+ currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ boolean newNodeUsed = false;
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ if (preferenceList.contains(NODES[2])) {
+ newNodeUsed = true;
+ Assert.assertEquals(preferenceList.get(1), NODES[2],
+ "newly added node not at preference list tail for " + partition);
+ }
+ }
+ Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
+
+ // now remap this to take the new node into account, should go back to balancing masters, slaves
+ // evenly across all nodes
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ Set<String> secondNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ secondNodes.add(preferenceList.get(1));
+ }
+ Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
+ Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
+
+ // remove a node now, but use the current mapping with everything balanced just prior
+ liveNodes.remove(0);
+ stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // remove all references of n0 from the mapping, keep everything else in a legal state
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Map<String, String> stateMap = currentMapping.get(partition);
+ for (String participant : stateMap.keySet()) {
+ Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
+ + partition);
+ }
+ for (String participant : preferenceList) {
+ if (!stateMap.containsKey(participant)) {
+ Assert.assertNotSame(preferenceList.get(0), participant,
+ "newly moved replica should not be master for " + partition);
+ }
+ }
+ }
+
+ // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(allNodes, liveNodes, currentMapping, null);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+ }
+}
[5/5] helix git commit: [HELIX-633] AutoRebalancer should ignore
disabled instance and all partitions on disabled instances should be dropped
in FULL_AUTO rebalance mode
Posted by lx...@apache.org.
[HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bc0aa76a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bc0aa76a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bc0aa76a
Branch: refs/heads/helix-0.6.x
Commit: bc0aa76a9de6243928e53e1a1d01e7502ff8267c
Parents: f5ac8f8
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue May 31 19:17:39 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 3 +
.../util/ConstraintBasedAssignment.java | 22 +--
.../controller/stages/ClusterDataCache.java | 16 +++
.../TestAutoRebalanceWithDisabledInstance.java | 142 +++++++++++++++++++
.../integration/TestStateTransitionTimeout.java | 28 ----
.../integration/ZkStandAloneCMTestBase.java | 2 +
.../mock/participant/MockMSStateModel.java | 65 +--------
7 files changed, 180 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index a8d83a2..e47297f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,6 +82,9 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
+ allNodes.removeAll(clusterData.getDisabledInstances());
+ liveNodes.retainAll(allNodes);
+
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index a520803..9366bcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -75,24 +75,26 @@ public class ConstraintBasedAssignment {
boolean isResourceEnabled) {
Map<String, String> instanceStateMap = new HashMap<String, String>();
- // if the ideal state is deleted, instancePreferenceList will be empty and
- // we should drop all resources.
if (currentStateMap != null) {
for (String instance : currentStateMap.keySet()) {
- if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
- && !disabledInstancesForPartition.contains(instance)) {
- // if dropped (whether disabled or not), transit to DROPPED
+ if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) {
+ // The partition is dropped from preference list.
+ // Transit to DROPPED no matter the instance is disabled or not.
instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
- } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals(
- HelixDefinedState.ERROR.name()))
- && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) {
+ } else {
// if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
- instanceStateMap.put(instance, stateModelDef.getInitialState());
+ if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) {
+ if (currentStateMap.get(instance) == null || !currentStateMap.get(instance)
+ .equals(HelixDefinedState.ERROR.name())) {
+ instanceStateMap.put(instance, stateModelDef.getInitialState());
+ }
+ }
}
}
}
- // ideal state is deleted
+ // if the ideal state is deleted, instancePreferenceList will be empty and
+ // we should drop all resources.
if (instancePreferenceList == null) {
return instanceStateMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index b77ce0d..cb5bda8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -390,6 +390,22 @@ public class ClusterDataCache {
return disabledInstancesSet;
}
+
+ /**
+ * This method allows one to fetch the set of nodes that are disabled
+ * @return
+ */
+ public Set<String> getDisabledInstances() {
+ Set<String> disabledInstancesSet = new HashSet<String>();
+ for (String instance : _instanceConfigMap.keySet()) {
+ InstanceConfig config = _instanceConfigMap.get(instance);
+ if (config.getInstanceEnabled() == false) {
+ disabledInstancesSet.add(instance);
+ }
+ }
+ return disabledInstancesSet;
+ }
+
/**
* Returns the number of replicas for a given resource.
* @param resourceName
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
new file mode 100644
index 0000000..84eca6b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java
@@ -0,0 +1,142 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase {
+ private static String TEST_DB_2 = "TestDB2";
+
+ @BeforeClass
+ @Override
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL,
+ RebalanceMode.FULL_AUTO + "");
+ _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica);
+
+ Thread.sleep(200);
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
+ Assert.assertTrue(result);
+ }
+
+ @Test()
+ public void testDisableEnableInstanceAutoRebalance() throws Exception {
+ String disabledInstance = _participants[0].getInstanceName();
+
+ Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2,
+ disabledInstance);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+ disabledInstance);
+ Assert.assertFalse(currentPartitions.isEmpty());
+
+ // disable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertTrue(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertTrue(currentPartitions.isEmpty());
+
+ //enable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance);
+ Assert.assertFalse(currentPartitions.isEmpty());
+ }
+
+ @Test()
+ public void testAddDisabledInstanceAutoRebalance() throws Exception {
+ // add disabled instance.
+ String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName);
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false);
+
+ participant.syncStart();
+
+ Thread.sleep(400);
+ Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertTrue(assignedPartitions.isEmpty());
+ Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2,
+ nodeName);
+ Assert.assertTrue(currentPartitions.isEmpty());
+
+ //enable instance
+ _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true);
+ Thread.sleep(400);
+ assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertFalse(assignedPartitions.isEmpty());
+ currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName);
+ Assert.assertFalse(currentPartitions.isEmpty());
+ }
+
+ private Set<String> getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) {
+ HelixAdmin admin = _setupTool.getClusterManagementTool();
+ Set<String> partitionSet = new HashSet<String>();
+ IdealState is = admin.getResourceIdealState(cluster, dbName);
+ for (String partition : is.getRecord().getListFields().keySet()) {
+ List<String> assignments = is.getRecord().getListField(partition);
+ for (String ins : assignments) {
+ if (ins.equals(instance)) {
+ partitionSet.add(partition);
+ }
+ }
+ }
+
+ return partitionSet;
+ }
+
+ private Set<String> getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) {
+ HelixAdmin admin = _setupTool.getClusterManagementTool();
+ Set<String> partitionSet = new HashSet<String>();
+
+ ExternalView ev = admin.getResourceExternalView(cluster, dbName);
+ for (String partition : ev.getRecord().getMapFields().keySet()) {
+ Map<String, String> assignments = ev.getRecord().getMapField(partition);
+ for (String ins : assignments.keySet()) {
+ if (ins.equals(instance)) {
+ partitionSet.add(partition);
+ }
+ }
+ }
+ return partitionSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
index 443d484..fb534fd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java
@@ -99,14 +99,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
_sleep = sleep;
}
- @Override
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
- LOG.info("Become SLAVE from OFFLINE");
-
- }
-
- @Override
@Transition(to = "MASTER", from = "SLAVE")
public void onBecomeMasterFromSlave(Message message, NotificationContext context)
throws InterruptedException {
@@ -117,26 +109,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
}
@Override
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
- LOG.info("Become SLAVE from MASTER");
- }
-
- @Override
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
- LOG.info("Become OFFLINE from SLAVE");
-
- }
-
- @Override
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
- LOG.info("Become DROPPED from OFFLINE");
-
- }
-
- @Override
public void rollbackOnError(Message message, NotificationContext context,
StateTransitionError error) {
_error = error;
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 5d169d5..f694618 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -91,6 +91,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME));
+ Assert.assertTrue(result);
+
result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
CLUSTER_NAME));
http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
index 61733ba..7d90063 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java
@@ -43,67 +43,12 @@ public class MockMSStateModel extends StateModel {
_transition = transition;
}
- // overwrite default error->dropped transition
- @Transition(to = "DROPPED", from = "ERROR")
- public void onBecomeDroppedFromError(Message message, NotificationContext context)
+ @Transition(to = "*", from = "*")
+ public void generalTransitionHandle(Message message, NotificationContext context)
throws InterruptedException {
- LOG.info("Become DROPPED from ERROR");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "SLAVE", from = "OFFLINE")
- public void onBecomeSlaveFromOffline(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become SLAVE from OFFLINE");
- if (_transition != null) {
- _transition.doTransition(message, context);
-
- }
- }
-
- @Transition(to = "MASTER", from = "SLAVE")
- public void onBecomeMasterFromSlave(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become MASTER from SLAVE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "SLAVE", from = "MASTER")
- public void onBecomeSlaveFromMaster(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become SLAVE from MASTER");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "SLAVE")
- public void onBecomeOfflineFromSlave(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become OFFLINE from SLAVE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "DROPPED", from = "OFFLINE")
- public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become DROPPED from OFFLINE");
- if (_transition != null) {
- _transition.doTransition(message, context);
- }
- }
-
- @Transition(to = "OFFLINE", from = "ERROR")
- public void onBecomeOfflineFromError(Message message, NotificationContext context)
- throws InterruptedException {
- LOG.info("Become OFFLINE from ERROR");
- // System.err.println("Become OFFLINE from ERROR");
+ LOG.info(String
+ .format("Resource %s partition %s becomes %s from %s", message.getResourceName(),
+ message.getPartitionName(), message.getToState(), message.getFromState()));
if (_transition != null) {
_transition.doTransition(message, context);
}
[4/5] helix git commit: [HELIX-634] Refactor AutoRebalancer to allow
configuable placement strategy.
Posted by lx...@apache.org.
[HELIX-634] Refactor AutoRebalancer to allow configuable placement strategy.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ea0fbbbc
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ea0fbbbc
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ea0fbbbc
Branch: refs/heads/helix-0.6.x
Commit: ea0fbbbce302974b88a2b8253bf06616fd91aa5b
Parents: bc0aa76
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Jun 7 14:42:43 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Sep 12 10:06:33 2016 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 40 +++++++---
.../strategy/AutoRebalanceStrategy.java | 40 +++++-----
.../controller/strategy/RebalanceStrategy.java | 52 +++++++++++++
.../java/org/apache/helix/model/IdealState.java | 23 +++++-
.../helix/model/builder/IdealStateBuilder.java | 80 ++++++++++++++++++++
.../task/GenericTaskAssignmentCalculator.java | 5 +-
.../strategy/TestAutoRebalanceStrategy.java | 25 +++---
7 files changed, 217 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index e47297f..6682426 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -35,8 +36,7 @@ import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.DefaultPlacementScheme;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
@@ -44,6 +44,7 @@ import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
/**
@@ -59,14 +60,14 @@ import org.apache.log4j.Logger;
public class AutoRebalancer implements Rebalancer, MappingCalculator {
// These should be final, but are initialized in init rather than a constructor
private HelixManager _manager;
- private AutoRebalanceStrategy _algorithm;
+ private RebalanceStrategy _rebalanceStrategy;
private static final Logger LOG = Logger.getLogger(AutoRebalancer.class);
@Override
public void init(HelixManager manager) {
this._manager = manager;
- this._algorithm = null;
+ this._rebalanceStrategy = null;
}
@Override
@@ -127,13 +128,32 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator {
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
- ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme();
- placementScheme.init(_manager);
- _algorithm =
- new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition,
- placementScheme);
+ String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
+ if (rebalanceStrategyName == null || rebalanceStrategyName.equalsIgnoreCase("default")) {
+ _rebalanceStrategy =
+ new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, maxPartition);
+ } else {
+ try {
+ _rebalanceStrategy = RebalanceStrategy.class
+ .cast(HelixUtil.loadClass(getClass(), rebalanceStrategyName).newInstance());
+ _rebalanceStrategy.init(resourceName, partitions, stateCountMap, maxPartition);
+ } catch (ClassNotFoundException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (InstantiationException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ } catch (IllegalAccessException ex) {
+ throw new HelixException(
+ "Exception while invoking custom rebalance strategy class: " + rebalanceStrategyName,
+ ex);
+ }
+ }
+
ZNRecord newMapping =
- _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ _rebalanceStrategy.computePartitionAssignment(liveNodes, currentMapping, allNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: " + currentMapping);
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 11b5b0d..959609f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -36,16 +36,15 @@ import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.log4j.Logger;
-public class AutoRebalanceStrategy {
-
+public class AutoRebalanceStrategy implements RebalanceStrategy {
private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
-
- private final String _resourceName;
- private final List<String> _partitions;
- private final LinkedHashMap<String, Integer> _states;
- private final int _maximumPerNode;
private final ReplicaPlacementScheme _placementScheme;
+ private String _resourceName;
+ private List<String> _partitions;
+ private LinkedHashMap<String, Integer> _states;
+ private int _maximumPerNode;
+
private Map<String, Node> _nodeMap;
private List<Node> _liveNodesList;
private Map<Integer, String> _stateMap;
@@ -56,24 +55,26 @@ public class AutoRebalanceStrategy {
private Set<Replica> _orphaned;
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
- final LinkedHashMap<String, Integer> states, int maximumPerNode,
- ReplicaPlacementScheme placementScheme) {
- _resourceName = resourceName;
- _partitions = partitions;
- _states = states;
- _maximumPerNode = maximumPerNode;
- if (placementScheme != null) {
- _placementScheme = placementScheme;
- } else {
- _placementScheme = new DefaultPlacementScheme();
- }
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ init(resourceName, partitions, states, maximumPerNode);
+ _placementScheme = new DefaultPlacementScheme();
}
public AutoRebalanceStrategy(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states) {
- this(resourceName, partitions, states, Integer.MAX_VALUE, new DefaultPlacementScheme());
+ this(resourceName, partitions, states, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+ _resourceName = resourceName;
+ _partitions = partitions;
+ _states = states;
+ _maximumPerNode = maximumPerNode;
}
+ @Override
public ZNRecord computePartitionAssignment(final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) {
int numReplicas = countStateReplicas();
@@ -546,7 +547,6 @@ public class AutoRebalanceStrategy {
/**
* Counts the total number of replicas given a state-count mapping
- * @param states
* @return
*/
private int countStateReplicas() {
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..4daae82
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java
@@ -0,0 +1,52 @@
+package org.apache.helix.controller.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of partition->instance.
+ */
+public interface RebalanceStrategy {
+ /**
+ * Perform the necessary initialization for the rebalance strategy object.
+ * @param resourceName
+ * @param partitions
+ * @param states
+ * @param maximumPerNode
+ */
+ void init(String resourceName, final List<String> partitions,
+ final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+ /**
+ * Compute the preference lists and (optional partition-state mapping) for the given resource.
+ *
+ * @param liveNodes
+ * @param currentMapping
+ * @param allNodes
+ * @return
+ */
+ ZNRecord computePartitionAssignment(final List<String> liveNodes,
+ final Map<String, Map<String, String>> currentMapping, final List<String> allNodes);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 44f4219..7c4cf54 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -53,10 +53,11 @@ public class IdealState extends HelixProperty {
@Deprecated
IDEAL_STATE_MODE,
REBALANCE_MODE,
+ REBALANCER_CLASS_NAME,
REBALANCE_TIMER_PERIOD,
+ REBALANCE_STRATEGY,
MAX_PARTITIONS_PER_INSTANCE,
INSTANCE_GROUP_TAG,
- REBALANCER_CLASS_NAME,
HELIX_ENABLED,
RESOURCE_GROUP_NAME,
GROUP_ROUTING_ENABLED,
@@ -165,6 +166,26 @@ public class IdealState extends HelixProperty {
}
/**
+ * Specify the strategy for Helix to use to compute the partition-instance assignment,
+ * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy}
+ *
+ * @param rebalanceStrategy
+ * @return
+ */
+ public void setRebalanceStrategy(String rebalanceStrategy) {
+ _record.setSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name(), rebalanceStrategy);
+ }
+
+ /**
+ * Get the rebalance strategy for this resource.
+ *
+ * @return rebalance strategy, or null if not specified.
+ */
+ public String getRebalanceStrategy() {
+ return _record.getSimpleField(IdealStateProperty.REBALANCE_STRATEGY.name());
+ }
+
+ /**
* Set the resource group name
* @param resourceGroupName
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
index d3bc3f2..9ad3023 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/IdealStateBuilder.java
@@ -52,6 +52,17 @@ public abstract class IdealStateBuilder {
* Helix rebalancer strategies. AUTO, SEMI_AUTO, CUSTOMIZED
*/
protected IdealState.RebalanceMode rebalancerMode;
+
+ /**
+ * Customized rebalancer class.
+ */
+ private String rebalancerClassName;
+
+ /**
+ * Custom rebalance strategy
+ */
+ private String rebalanceStrategy;
+
/**
* A constraint that limits the maximum number of partitions per Node.
*/
@@ -68,6 +79,16 @@ public abstract class IdealStateBuilder {
*/
private Boolean disableExternalView = null;
+ /**
+ * Resource group name.
+ */
+ private String resourceGroupName;
+
+ /**
+ * Whether the resource group routing should be enabled in routingProvider.
+ */
+ private Boolean enableGroupRouting;
+
protected ZNRecord _record;
/**
@@ -144,6 +165,44 @@ public abstract class IdealStateBuilder {
}
/**
+ * Set custom rebalancer class name.
+ * @return IdealStateBuilder
+ */
+ public IdealStateBuilder setRebalancerClass(String rebalancerClassName) {
+ this.rebalancerClassName = rebalancerClassName;
+ return this;
+ }
+
+ /**
+ * Set custom rebalance strategy name.
+ * @param rebalanceStrategy
+ * @return
+ */
+ public IdealStateBuilder setRebalanceStrategy(String rebalanceStrategy) {
+ this.rebalanceStrategy = rebalanceStrategy;
+ return this;
+ }
+
+ /**
+ *
+ * @param resourceGroupName
+ * @return
+ */
+ public IdealStateBuilder setResourceGroupName(String resourceGroupName) {
+ this.resourceGroupName = resourceGroupName;
+ return this;
+ }
+
+ /**
+ * Enable Group Routing for this resource.
+ * @return
+ */
+ public IdealStateBuilder enableGroupRouting() {
+ this.enableGroupRouting = true;
+ return this;
+ }
+
+ /**
* @return
*/
public IdealState build() {
@@ -154,10 +213,31 @@ public abstract class IdealStateBuilder {
idealstate.setStateModelFactoryName(stateModelFactoryName);
idealstate.setRebalanceMode(rebalancerMode);
idealstate.setReplicas("" + numReplica);
+
+ if (rebalancerClassName != null) {
+ idealstate.setRebalancerClassName(rebalancerClassName);
+ }
+
+ if (rebalanceStrategy != null) {
+ idealstate.setRebalanceStrategy(rebalanceStrategy);
+ }
+
+ if (maxPartitionsPerNode > 0) {
+ idealstate.setMaxPartitionsPerInstance(maxPartitionsPerNode);
+ }
+
if (disableExternalView != null) {
idealstate.setDisableExternalView(disableExternalView);
}
+ if (resourceGroupName != null) {
+ idealstate.setResourceGroupName(resourceGroupName);
+ }
+
+ if (enableGroupRouting != null) {
+ idealstate.enableGroupRouting(enableGroupRouting);
+ }
+
if (!idealstate.isValid()) {
throw new HelixException("invalid ideal-state: " + idealstate);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index b0a1a33..623357f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -33,6 +33,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.strategy.RebalanceStrategy;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
@@ -121,9 +122,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
}
// Get the assignment keyed on partition
- AutoRebalanceStrategy strategy =
- new AutoRebalanceStrategy(resourceId, partitions, states, Integer.MAX_VALUE,
- new AutoRebalanceStrategy.DefaultPlacementScheme());
+ RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, partitions, states);
List<String> allNodes =
Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache));
Collections.sort(allNodes);
http://git-wip-us.apache.org/repos/asf/helix/blob/ea0fbbbc/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 985d0c8..adc92d6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -116,8 +116,7 @@ public class TestAutoRebalanceStrategy {
StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states);
new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode,
- stateModelDef, new AutoRebalanceStrategy.DefaultPlacementScheme())
- .runRepeatedly(numIterations);
+ stateModelDef).runRepeatedly(numIterations);
}
/**
@@ -157,13 +156,11 @@ public class TestAutoRebalanceStrategy {
private List<String> _allNodes;
private int _maxPerNode;
private StateModelDefinition _stateModelDef;
- private ReplicaPlacementScheme _placementScheme;
private Random _random;
public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states,
List<String> liveNodes, Map<String, Map<String, String>> currentMapping,
- List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef,
- ReplicaPlacementScheme placementScheme) {
+ List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) {
_partitions = partitions;
_states = states;
_liveNodes = liveNodes;
@@ -182,7 +179,6 @@ public class TestAutoRebalanceStrategy {
}
_maxPerNode = maxPerNode;
_stateModelDef = stateModelDef;
- _placementScheme = placementScheme;
_random = new Random();
}
@@ -193,9 +189,10 @@ public class TestAutoRebalanceStrategy {
*/
public void runRepeatedly(int numIterations) {
logger.info("~~~~ Initial State ~~~~~");
+ RebalanceStrategy strategy =
+ new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode);
ZNRecord initialResult =
- new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ strategy.computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
_currentMapping = getMapping(initialResult.getListFields());
logger.info(_currentMapping);
getRunResult(_currentMapping, initialResult.getListFields());
@@ -500,8 +497,8 @@ public class TestAutoRebalanceStrategy {
_liveSet.add(node);
_nonLiveSet.remove(node);
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode).
+ computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
/**
@@ -534,8 +531,8 @@ public class TestAutoRebalanceStrategy {
}
}
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
/**
@@ -560,8 +557,8 @@ public class TestAutoRebalanceStrategy {
_liveNodes.add(node);
_liveSet.add(node);
- return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode,
- _placementScheme).computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
+ return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode)
+ .computePartitionAssignment(_liveNodes, _currentMapping, _allNodes);
}
private <T> T getRandomSetElement(Set<T> source) {