You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/11/12 09:23:13 UTC
helix git commit: HELIX-543 RB-27808 Avoid moving partitions
unnecessarily when auto-rebalancing
Repository: helix
Updated Branches:
refs/heads/master e96ea8e20 -> dc9f129b6
HELIX-543 RB-27808 Avoid moving partitions unnecessarily when auto-rebalancing
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dc9f129b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dc9f129b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dc9f129b
Branch: refs/heads/master
Commit: dc9f129b67f8cacdf0cd22288f166b56fc5654a0
Parents: e96ea8e
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Nov 12 00:23:03 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Nov 12 00:23:03 2014 -0800
----------------------------------------------------------------------
helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy | 18 ++++
.../strategy/AutoRebalanceStrategy.java | 61 +++++++++--
.../strategy/TestAutoRebalanceStrategy.java | 62 +++++++++++
.../SinglePartitionLeaderStandByTest.java | 108 +++++++++++++++++++
4 files changed, 242 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
----------------------------------------------------------------------
diff --git a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
index f59be07..ef1f57e 100644
--- a/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
+++ b/helix-agent/helix-agent-0.7.2-SNAPSHOT.ivy
@@ -1,4 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
<ivy-module version="2.0" xmlns:m="http://ant.apache.org/ivy/maven">
<info organisation="org.apache.helix"
module="helix-agent"
http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 09b66c1..6e0e226 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -33,6 +33,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
@@ -141,8 +144,12 @@ public class AutoRebalanceStrategy {
final Map<PartitionId, Map<ParticipantId, State>> currentMapping,
final List<ParticipantId> allNodes) {
Comparator<ParticipantId> nodeComparator = new NodeComparator();
+ Comparator<ParticipantId> currentStateNodeComparator =
+ new CurrentStateNodeComparator(currentMapping);
+
List<ParticipantId> sortedLiveNodes = new ArrayList<ParticipantId>(liveNodes);
- Collections.sort(sortedLiveNodes, nodeComparator);
+ Collections.sort(sortedLiveNodes, currentStateNodeComparator);
+
List<ParticipantId> sortedAllNodes = new ArrayList<ParticipantId>(allNodes);
Collections.sort(sortedAllNodes, nodeComparator);
List<String> sortedNodeNames =
@@ -481,7 +488,7 @@ public class AutoRebalanceStrategy {
}
/**
- * Safe check for the number of replicas of a given id assiged to a node
+ * Safe check for the number of replicas of a given id assigned to a node
* @param state the state to assign
* @param node the node to check
* @param nodeReplicaCounts a map of node to replica id and counts
@@ -496,11 +503,11 @@ public class AutoRebalanceStrategy {
return 0;
}
Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
- if (!replicaCounts.containsKey(state)) {
+ if (!replicaCounts.containsKey(state.toString())) {
replicaCounts.put(state.toString(), 0);
return 0;
}
- return replicaCounts.get(state);
+ return replicaCounts.get(state.toString());
}
/**
@@ -609,7 +616,7 @@ public class AutoRebalanceStrategy {
/**
* Given a predefined set of all possible nodes, compute an assignment of replicas to
* nodes that evenly assigns all replicas to nodes.
- * @param allNodes Identifiers to all nodes, live and non-live
+ * @param nodeNames Identifiers to all nodes, live and non-live
* @return Preferred assignment of replicas
*/
private Map<Replica, Node> computePreferredPlacement(final List<String> nodeNames) {
@@ -633,8 +640,7 @@ public class AutoRebalanceStrategy {
/**
* Counts the total number of replicas given a state-count mapping
- * @param states
- * @return
+ * @return The number
*/
private int countStateReplicas() {
int total = 0;
@@ -844,4 +850,45 @@ public class AutoRebalanceStrategy {
return o1.toString().compareTo(o2.toString());
}
}
+
+ /**
+ * Sorter for live nodes that sorts firstly according to the number of partitions currently
+ * registered against a node (more partitions means sort earlier), then by node name.
+ * This prevents unnecessarily moving partitions due to the capacity assignment
+ * unnecessarily reducing the capacity of lower down elements.
+ */
+ private static class CurrentStateNodeComparator implements Comparator<ParticipantId> {
+
+ /**
+ * The number of partitions that are active for each partition.
+ */
+ private final Map<ParticipantId, Integer> partitionCounts;
+
+ /**
+ * Create it.
+ * @param currentMapping The current mapping of partitions to participants.
+ */
+ public CurrentStateNodeComparator(Map<PartitionId, Map<ParticipantId, State>> currentMapping) {
+ partitionCounts = new HashMap<ParticipantId, Integer>();
+ for (Entry<PartitionId, Map<ParticipantId, State>> entry : currentMapping.entrySet()) {
+ for (ParticipantId participantId : entry.getValue().keySet()) {
+ Integer existing = partitionCounts.get(participantId);
+ partitionCounts.put(participantId, existing != null ? existing + 1 : 1);
+ }
+ }
+ }
+
+ @Override
+ public int compare(ParticipantId o1, ParticipantId o2) {
+ Integer c1 = partitionCounts.get(o1);
+ if (c1 == null) {
+ c1 = 0;
+ }
+ Integer c2 = partitionCounts.get(o2);
+ if (c2 == null) {
+ c2 = 0;
+ }
+ return c1 < c2 ? 1 : (c1 > c2 ? -1 : o1.toString().compareTo(o2.toString()));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 1322b40..25c550d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -788,4 +788,66 @@ public class TestAutoRebalanceStrategy {
}
Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed");
}
+
+ /**
+ * Tests the following scenario: there is only a single partition for a resource. Two nodes up,
+ * partition should
+ * be assigned to one of them. Take down that node, partition should move. Bring back up that
+ * node, partition should not move unnecessarily.
+ */
+ @Test
+ public void testWontMoveSinglePartitionUnnecessarily() {
+ final ResourceId RESOURCE = ResourceId.from("resource");
+ final PartitionId partition = PartitionId.from("resource_0");
+ final StateModelDefinition STATE_MODEL =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+ LinkedHashMap<State, Integer> stateCount = Maps.newLinkedHashMap();
+ stateCount.put(State.from("ONLINE"), 1);
+ final ParticipantId[] NODES = {
+ ParticipantId.from("n0"), ParticipantId.from("n1")
+ };
+
+ ReplicaPlacementScheme scheme = new AutoRebalanceStrategy.DefaultPlacementScheme();
+ // initial state, one node, no mapping
+ List<ParticipantId> allNodes = Lists.newArrayList(NODES);
+ List<ParticipantId> liveNodes = Lists.newArrayList(NODES);
+ Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
+ currentMapping.put(partition, new HashMap<ParticipantId, State>());
+
+ // Both nodes there
+ List<PartitionId> partitions = Lists.newArrayList(partition);
+ Map<State, String> upperBounds = Maps.newHashMap();
+ for (State state : STATE_MODEL.getTypedStatesPriorityList()) {
+ upperBounds.put(state, STATE_MODEL.getNumParticipantsPerState(state));
+ }
+
+ ZNRecord znRecord =
+ new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
+ .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
+ Map<String, List<String>> preferenceLists = znRecord.getListFields();
+ List<String> preferenceList = preferenceLists.get(partition.toString());
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+ String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+ Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+ ParticipantId preferredNode = ParticipantId.from(preferenceList.get(0));
+ ParticipantId otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0];
+ // ok, see what happens if we've got the partition on the other node (e.g. due to the preferred
+ // node being down).
+ currentMapping.get(partition).put(otherNode, State.from(state));
+
+ znRecord =
+ new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE, scheme)
+ .typedComputePartitionAssignment(liveNodes, currentMapping, allNodes);
+
+ preferenceLists = znRecord.getListFields();
+ preferenceList = preferenceLists.get(partition.toString());
+ Assert.assertNotNull(preferenceList, "invalid preference list for " + partition);
+ Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition);
+ state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0));
+ Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition);
+ ParticipantId finalPreferredNode = ParticipantId.from(preferenceList.get(0));
+ // finally, make sure we haven't moved it.
+ Assert.assertEquals(finalPreferredNode, otherNode);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/dc9f129b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
new file mode 100644
index 0000000..ec8de0a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java
@@ -0,0 +1,108 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Date;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.testutil.TestUtil;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * This is a simple integration test. We will use this until we have framework
+ * which helps us write integration tests easily
+ */
+
+public class SinglePartitionLeaderStandByTest extends ZkTestBase {
+ @Test
+ public void test() throws Exception {
+ String clusterName = TestUtil.getTestName();
+ int n = 4;
+
+ System.out.println("START " + clusterName +" at " + new Date(System.currentTimeMillis()));
+
+ // Thread.currentThread().join();
+ TestHelper.setupCluster(clusterName, _zkaddr, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 2, // partitions per resource
+ n, // number of nodes
+ 0, // replicas
+ "LeaderStandby",
+ RebalanceMode.FULL_AUTO,
+ false); // dont rebalance
+
+ // rebalance ideal-state to use ANY_LIVEINSTANCE for preference list
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey key = keyBuilder.idealStates("TestDB0");
+ IdealState idealState = accessor.getProperty(key);
+ idealState.setReplicas(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString());
+ idealState.getRecord().setListField("TestDB0_0",
+ Arrays.asList(HelixConstants.StateModelToken.ANY_LIVEINSTANCE.toString()));
+ accessor.setProperty(key, idealState);
+
+ MockController controller =
+ new MockController(_zkaddr, clusterName, "controller_0");
+ controller.syncStart();
+
+ // start participants
+ MockParticipant[] participants = new MockParticipant[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
+ clusterName));
+
+ Assert.assertTrue(result);
+ //stop the first participatn
+ participants[0].syncStop();
+ Thread.sleep(10000);
+ for (int i = 0; i < 1; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipant(_zkaddr, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}