You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/26 22:25:48 UTC
git commit: [HELIX-277] FULL_AUTO rebalancer should not prefer nodes
that are just coming up
Updated Branches:
refs/heads/master f6e4c87e2 -> 90faf91bb
[HELIX-277] FULL_AUTO rebalancer should not prefer nodes that are just coming up
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/90faf91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/90faf91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/90faf91b
Branch: refs/heads/master
Commit: 90faf91bb87594c67be23a1e4547e81f58ea7c8e
Parents: f6e4c87
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Oct 23 17:36:15 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Oct 23 17:36:15 2013 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 5 +
.../strategy/AutoRebalanceStrategy.java | 118 ++++++-----
.../strategy/TestAutoRebalanceStrategy.java | 196 +++++++++++++++++++
3 files changed, 273 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/90faf91b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 3eb258b..946dd5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -109,6 +110,10 @@ public class AutoRebalancer implements Rebalancer {
liveNodes = new ArrayList<String>(taggedLiveNodes);
}
+ // sort node lists to ensure consistent preferred assignments
+ Collections.sort(allNodes);
+ Collections.sort(liveNodes);
+
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/90faf91b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 72046bf..cc5acb7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -27,10 +27,10 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.Map.Entry;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
@@ -162,8 +162,8 @@ public class AutoRebalanceStrategy {
* and its preferred node is under capacity.
*/
private void moveNonPreferredReplicasToPreferred() {
- // iterate through non preferred and see if we can move them to
- // preferredlocation if the donor has more than it should and stealer has
+ // iterate through non preferred and see if we can move them to the
+ // preferred location if the donor has more than it should and stealer has
// enough capacity
Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator();
while (iterator.hasNext()) {
@@ -177,6 +177,8 @@ public class AutoRebalanceStrategy {
receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
donor.nonPreferred.remove(replica);
receiver.preferred.add(replica);
+ donor.newReplicas.remove(replica);
+ receiver.newReplicas.add(replica);
iterator.remove();
}
}
@@ -199,6 +201,7 @@ public class AutoRebalanceStrategy {
if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
receiver.nonPreferred.add(replica);
+ receiver.newReplicas.add(replica);
added = true;
break;
}
@@ -293,65 +296,87 @@ public class AutoRebalanceStrategy {
// The list fields are also keyed on partition and list all the nodes serving that partition.
// This is useful to verify that there is no node serving multiple replicas of the same
// partition.
+ Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>();
for (String partition : _partitions) {
znRecord.setMapField(partition, new TreeMap<String, String>());
znRecord.setListField(partition, new ArrayList<String>());
+ newPreferences.put(partition, new ArrayList<String>());
}
- int count = countStateReplicas();
- for (int replicaId = 0; replicaId < count; replicaId++) {
- for (Node node : _liveNodesList) {
- for (Replica replica : node.preferred) {
- if (replicaId == replica.replicaId) {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- for (Replica replica : node.nonPreferred) {
- if (replicaId == replica.replicaId) {
- znRecord.getListField(replica.partition).add(node.id);
- }
- }
- }
- }
- normalizePreferenceLists(znRecord.getListFields());
+ // for preference lists, the rough priority that we want is:
+ // [existing preferred, existing non-preferred, non-existing preferred, non-existing
+ // non-preferred]
for (Node node : _liveNodesList) {
for (Replica replica : node.preferred) {
- znRecord.getMapField(replica.partition).put(node.id, _stateMap.get(replica.replicaId));
+ if (node.newReplicas.contains(replica)) {
+ newPreferences.get(replica.partition).add(node.id);
+ } else {
+ znRecord.getListField(replica.partition).add(node.id);
+ }
}
+ }
+ for (Node node : _liveNodesList) {
for (Replica replica : node.nonPreferred) {
- znRecord.getMapField(replica.partition).put(node.id, _stateMap.get(replica.replicaId));
+ if (node.newReplicas.contains(replica)) {
+ newPreferences.get(replica.partition).add(node.id);
+ } else {
+ znRecord.getListField(replica.partition).add(node.id);
+ }
+ }
+ }
+ normalizePreferenceLists(znRecord.getListFields(), newPreferences);
+
+ // generate preference maps based on the preference lists
+ for (String partition : _partitions) {
+ List<String> preferenceList = znRecord.getListField(partition);
+ int i = 0;
+ for (String participant : preferenceList) {
+ znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+ i++;
}
}
}
/**
- * Adjust preference lists to reduce the number of same replicas on an instance
+ * Adjust preference lists to reduce the number of same replicas on an instance. This will
+ * separately normalize two sets of preference lists, and then append the results of the second
+ * set to those of the first. This basically ensures that existing replicas are automatically
+ * preferred.
* @param preferenceLists map of (partition --> list of nodes)
+ * @param newPreferences map containing node preferences not consistent with the current
+ * assignment
*/
- private void normalizePreferenceLists(Map<String, List<String>> preferenceLists) {
- Map<String, Map<Integer, Integer>> nodeReplicaCounts =
- new HashMap<String, Map<Integer, Integer>>();
+ private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
+ Map<String, List<String>> newPreferences) {
+ Map<String, Map<String, Integer>> nodeReplicaCounts =
+ new HashMap<String, Map<String, Integer>>();
for (String partition : preferenceLists.keySet()) {
normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
}
+ for (String partition : newPreferences.keySet()) {
+ normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
+ preferenceLists.get(partition).addAll(newPreferences.get(partition));
+ }
}
/**
* Adjust a single preference list for replica assignment imbalance
* @param preferenceList list of node names
- * @param nodeReplicaCounts map of (node --> replica id --> count)
+ * @param nodeReplicaCounts map of (node --> state --> count)
*/
private void normalizePreferenceList(List<String> preferenceList,
- Map<String, Map<Integer, Integer>> nodeReplicaCounts) {
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
+ // make this a LinkedHashSet to preserve iteration order
Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
List<String> newPreferenceList = new ArrayList<String>();
int replicas = Math.min(countStateReplicas(), preferenceList.size());
for (int i = 0; i < replicas; i++) {
- String node = getMinimumNodeForReplica(i, notAssigned, nodeReplicaCounts);
+ String state = _stateMap.get(i);
+ String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts);
newPreferenceList.add(node);
notAssigned.remove(node);
- Map<Integer, Integer> counts = nodeReplicaCounts.get(node);
- counts.put(i, counts.get(i) + 1);
+ Map<String, Integer> counts = nodeReplicaCounts.get(node);
+ counts.put(state, counts.get(state) + 1);
}
preferenceList.clear();
preferenceList.addAll(newPreferenceList);
@@ -359,17 +384,17 @@ public class AutoRebalanceStrategy {
/**
* Get the node which hosts the fewest of a given replica
- * @param replicaId the replica
+ * @param state the state
* @param nodes nodes to check
* @param nodeReplicaCounts current assignment of replicas
* @return the node most willing to accept the replica
*/
- private String getMinimumNodeForReplica(int replicaId, Set<String> nodes,
- Map<String, Map<Integer, Integer>> nodeReplicaCounts) {
+ private String getMinimumNodeForReplica(String state, Set<String> nodes,
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
String minimalNode = null;
int minimalCount = Integer.MAX_VALUE;
for (String node : nodes) {
- int count = getReplicaCountForNode(replicaId, node, nodeReplicaCounts);
+ int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
if (count < minimalCount) {
minimalCount = count;
minimalNode = node;
@@ -380,25 +405,25 @@ public class AutoRebalanceStrategy {
/**
* Safe check for the number of replicas of a given id assiged to a node
- * @param replicaId the replica to assign
+ * @param state the state to assign
* @param node the node to check
* @param nodeReplicaCounts a map of node to replica id and counts
* @return the number of currently assigned replicas of the given id
*/
- private int getReplicaCountForNode(int replicaId, String node,
- Map<String, Map<Integer, Integer>> nodeReplicaCounts) {
+ private int getReplicaCountForNode(String state, String node,
+ Map<String, Map<String, Integer>> nodeReplicaCounts) {
if (!nodeReplicaCounts.containsKey(node)) {
- Map<Integer, Integer> replicaCounts = new HashMap<Integer, Integer>();
- replicaCounts.put(replicaId, 0);
+ Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
+ replicaCounts.put(state, 0);
nodeReplicaCounts.put(node, replicaCounts);
return 0;
}
- Map<Integer, Integer> replicaCounts = nodeReplicaCounts.get(node);
- if (!replicaCounts.containsKey(replicaId)) {
- replicaCounts.put(replicaId, 0);
+ Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
+ if (!replicaCounts.containsKey(state)) {
+ replicaCounts.put(state, 0);
return 0;
}
- return replicaCounts.get(replicaId);
+ return replicaCounts.get(state);
}
/**
@@ -562,7 +587,6 @@ public class AutoRebalanceStrategy {
* of replicas assigned to it, so it can decide if it can receive additional replicas.
*/
class Node {
-
public int currentlyAssigned;
public int capacity;
public boolean hasCeilingCapacity;
@@ -570,10 +594,12 @@ public class AutoRebalanceStrategy {
boolean isAlive;
private List<Replica> preferred;
private List<Replica> nonPreferred;
+ private Set<Replica> newReplicas;
public Node(String id) {
preferred = new ArrayList<Replica>();
nonPreferred = new ArrayList<Replica>();
+ newReplicas = new TreeSet<Replica>();
currentlyAssigned = 0;
isAlive = false;
this.id = id;
@@ -626,6 +652,7 @@ public class AutoRebalanceStrategy {
capacity++;
currentlyAssigned++;
nonPreferred.add(replica);
+ newReplicas.add(replica);
}
@Override
@@ -642,7 +669,6 @@ public class AutoRebalanceStrategy {
* and an identifier signifying a specific replica of a given partition and state.
*/
class Replica implements Comparable<Replica> {
-
private String partition;
private int replicaId; // this is a partition-relative id
private String format;
@@ -650,7 +676,7 @@ public class AutoRebalanceStrategy {
public Replica(String partition, int replicaId) {
this.partition = partition;
this.replicaId = replicaId;
- this.format = partition + "|" + replicaId;
+ this.format = this.partition + "|" + this.replicaId;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/90faf91b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 9fb7ba9..e47032e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -42,9 +42,16 @@ import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.log4j.Logger;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
public class TestAutoRebalanceStrategy {
private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class);
@@ -568,4 +575,193 @@ public class TestAutoRebalanceStrategy {
return null;
}
}
+
+ /**
+ * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference
+ * lists should prefer nodes in the current mapping at all times, but when all nodes are in the
+ * current mapping, then it should distribute states as evenly as possible.
+ */
+ @Test
+ public void testOrphansNotPreferred() {
+ final String RESOURCE_NAME = "resource";
+ final String[] PARTITIONS = {
+ "resource_0", "resource_1", "resource_2"
+ };
+ final StateModelDefinition STATE_MODEL =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ final int REPLICA_COUNT = 2;
+ final String[] NODES = {
+ "n0", "n1", "n2"
+ };
+
+ // initial state, one node, no mapping
+ List<String> allNodes = Lists.newArrayList(NODES[0]);
+ List<String> liveNodes = Lists.newArrayList(NODES[0]);
+ Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
+ for (String partition : PARTITIONS) {
+ currentMapping.put(partition, new HashMap<String, String>());
+ }
+
+ // make sure that when the first node joins, a single replica is assigned fairly
+ List<String> partitions = ImmutableList.copyOf(PARTITIONS);
+ LinkedHashMap<String, Integer> stateCount =
+ ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ ZNRecord znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ Map<String, List<String>> preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ // make sure these are all MASTER
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+ }
+
+ // now assign a replica to the first node in the current mapping, and add a second node
+ allNodes.add(NODES[1]);
+ liveNodes.add(NODES[1]);
+ stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[0], "MASTER");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for "
+ + partition);
+ Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for "
+ + partition);
+ }
+
+ // now set the current mapping to reflect this update and make sure that it distributes masters
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).put(NODES[1], "SLAVE");
+ }
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ Set<String> firstNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+
+ // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the
+ // new node is never the most preferred
+ allNodes.add(NODES[2]);
+ liveNodes.add(NODES[2]);
+ stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one
+ currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ boolean newNodeUsed = false;
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ if (preferenceList.contains(NODES[2])) {
+ newNodeUsed = true;
+ Assert.assertEquals(preferenceList.get(1), NODES[2],
+ "newly added node not at preference list tail for " + partition);
+ }
+ }
+ Assert.assertTrue(newNodeUsed, "not using " + NODES[2]);
+
+ // now remap this to take the new node into account, should go back to balancing masters, slaves
+ // evenly across all nodes
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ Set<String> secondNodes = Sets.newHashSet();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ secondNodes.add(preferenceList.get(1));
+ }
+ Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly");
+ Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly");
+
+ // remove a node now, but use the current mapping with everything balanced just prior
+ liveNodes.remove(0);
+ stateCount = ConstraintBasedAssignment.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT);
+
+ // remove all references of n0 from the mapping, keep everything else in a legal state
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ Map<String, String> stateMap = currentMapping.get(partition);
+ for (String participant : stateMap.keySet()) {
+ Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for "
+ + partition);
+ }
+ for (String participant : preferenceList) {
+ if (!stateMap.containsKey(participant)) {
+ Assert.assertNotSame(preferenceList.get(0), participant,
+ "newly moved replica should not be master for " + partition);
+ }
+ }
+ }
+
+ // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again
+ for (String partition : PARTITIONS) {
+ currentMapping.get(partition).clear();
+ }
+ currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER");
+ currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER");
+ currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE");
+ currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER");
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount)
+ .computePartitionAssignment(liveNodes, currentMapping, allNodes);
+ preferenceLists = znRecord.getListFields();
+ firstNodes.clear();
+ for (String partition : currentMapping.keySet()) {
+ List<String> preferenceList = preferenceLists.get(partition);
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition);
+ firstNodes.add(preferenceList.get(0));
+ }
+ Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
+ }
}