You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/23 02:22:21 UTC
git commit: [HELIX-274] Verify FULL_AUTO tagged node behavior,
fix bug in AutoRebalancer, rb=14858
Updated Branches:
refs/heads/master 422434240 -> f6e4c87e2
[HELIX-274] Verify FULL_AUTO tagged node behavior, fix bug in AutoRebalancer, rb=14858
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/f6e4c87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/f6e4c87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/f6e4c87e
Branch: refs/heads/master
Commit: f6e4c87e2cb146282ebf83003e784012ffc08ce5
Parents: 4224342
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 22 14:17:03 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 22 17:21:45 2013 -0700
----------------------------------------------------------------------
.../controller/rebalancer/AutoRebalancer.java | 13 +-
.../integration/TestFullAutoNodeTagging.java | 314 +++++++++++++++++++
2 files changed, 323 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f6e4c87e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 9564e35..3eb258b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -82,28 +82,33 @@ public class AutoRebalancer implements Rebalancer {
ConstraintBasedAssignment.stateCount(stateModelDef, liveInstance.size(),
Integer.parseInt(replicas));
List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
+ List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resource.getResourceName(), partitions, stateCountMap);
// If there are nodes tagged with resource name, use only those nodes
Set<String> taggedNodes = new HashSet<String>();
+ Set<String> taggedLiveNodes = new HashSet<String>();
if (currentIdealState.getInstanceGroupTag() != null) {
- for (String instanceName : liveNodes) {
+ for (String instanceName : allNodes) {
if (clusterData.getInstanceConfigMap().get(instanceName)
.containsTag(currentIdealState.getInstanceGroupTag())) {
taggedNodes.add(instanceName);
+ if (liveInstance.containsKey(instanceName)) {
+ taggedLiveNodes.add(instanceName);
+ }
}
}
}
if (taggedNodes.size() > 0) {
if (LOG.isInfoEnabled()) {
LOG.info("found the following instances with tag " + currentIdealState.getResourceName()
- + " " + taggedNodes);
+ + " " + taggedLiveNodes);
}
- liveNodes = new ArrayList<String>(taggedNodes);
+ allNodes = new ArrayList<String>(taggedNodes);
+ liveNodes = new ArrayList<String>(taggedLiveNodes);
}
- List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f6e4c87e/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
new file mode 100644
index 0000000..d815c80
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java
@@ -0,0 +1,314 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Test that node tagging behaves correctly in FULL_AUTO mode
+ */
+public class TestFullAutoNodeTagging extends ZkUnitTestBase {
+ private static final Logger LOG = Logger.getLogger(TestFullAutoNodeTagging.class);
+
+ /**
+ * Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10,
+ * checking external view every time a tagged node is started. Then shut down all 10, checking
+ * external view every time a tagged node is killed.
+ */
+ @Test
+ public void testSafeAssignment() throws Exception {
+ final int NUM_PARTICIPANTS = 10;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 2;
+ final String RESOURCE_NAME = "TestDB0";
+ final String TAG = "ASSIGNABLE";
+
+ final String[] TAGGED_NODES = {
+ "localhost_12920", "localhost_12922", "localhost_12924", "localhost_12925"
+ };
+ Set<String> taggedNodes = Sets.newHashSet(TAGGED_NODES);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+ true); // do rebalance
+
+ // tag the resource and participants
+ HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
+ for (String taggedNode : TAGGED_NODES) {
+ helixAdmin.addInstanceTag(clusterName, taggedNode, TAG);
+ }
+ IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME);
+ idealState.setInstanceGroupTag(TAG);
+ helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ final String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+
+ // ensure that everything is valid if this is a tagged node that is starting
+ if (taggedNodes.contains(instanceName)) {
+ // make sure that the best possible matches the external view
+ Thread.sleep(500);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // make sure that the tagged state of the nodes is still balanced
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
+ RESOURCE_NAME, TAGGED_NODES, false));
+ Assert.assertTrue(result, "initial assignment with all tagged nodes live is invalid");
+ }
+ }
+
+ // cleanup
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ String participantName = participants[i].getInstanceName();
+ participants[i].syncStop();
+ if (taggedNodes.contains(participantName)) {
+ // check that the external view is still correct even after removing tagged nodes
+ taggedNodes.remove(participantName);
+ Thread.sleep(500);
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName,
+ RESOURCE_NAME, TAGGED_NODES, taggedNodes.isEmpty()));
+ Assert.assertTrue(result, "incorrect state after removing " + participantName + ", "
+ + taggedNodes + " remain");
+ }
+ }
+ controller.syncStop();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * Checker for basic validity of the external view given node tagging requirements
+ */
+ private static class TaggedZkVerifier implements ZkVerifier {
+ private final String _clusterName;
+ private final String _resourceName;
+ private final String[] _taggedNodes;
+ private final boolean _isEmptyAllowed;
+ private final ZkClient _zkClient;
+
+ /**
+ * Create a verifier for a specific cluster and resource
+ * @param clusterName the cluster to verify
+ * @param resourceName the resource within the cluster to verify
+ * @param taggedNodes nodes tagged with the resource tag
+ * @param isEmptyAllowed true if empty assignments are legal
+ */
+ public TaggedZkVerifier(String clusterName, String resourceName, String[] taggedNodes,
+ boolean isEmptyAllowed) {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+ _taggedNodes = taggedNodes;
+ _isEmptyAllowed = isEmptyAllowed;
+ _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+ }
+
+ @Override
+ public boolean verify() {
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+ Set<String> taggedNodeSet = ImmutableSet.copyOf(_taggedNodes);
+
+ // set up counts of partitions, masters, and slaves per node
+ Map<String, Integer> partitionCount = Maps.newHashMap();
+ int partitionSum = 0;
+ Map<String, Integer> masterCount = Maps.newHashMap();
+ int masterSum = 0;
+ Map<String, Integer> slaveCount = Maps.newHashMap();
+ int slaveSum = 0;
+
+ for (String partitionName : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partitionName);
+ for (String participantName : stateMap.keySet()) {
+ String state = stateMap.get(participantName);
+ if (state.equalsIgnoreCase("MASTER") || state.equalsIgnoreCase("SLAVE")) {
+ partitionSum++;
+ incrementCount(partitionCount, participantName);
+ if (!taggedNodeSet.contains(participantName)) {
+ // not allowed to have a non-tagged node assigned
+ LOG.error("Participant " + participantName + " is not tag, but has an assigned node");
+ return false;
+ } else if (state.equalsIgnoreCase("MASTER")) {
+ masterSum++;
+ incrementCount(masterCount, participantName);
+ } else if (state.equalsIgnoreCase("SLAVE")) {
+ slaveSum++;
+ incrementCount(slaveCount, participantName);
+ }
+ }
+ }
+ }
+
+ // check balance in partitions per node
+ if (partitionCount.size() > 0) {
+ boolean partitionMapDividesEvenly = partitionSum % partitionCount.size() == 0;
+ boolean withinAverage =
+ withinAverage(partitionCount, _isEmptyAllowed, partitionMapDividesEvenly);
+ if (!withinAverage) {
+ LOG.error("partition counts deviate from average");
+ return false;
+ }
+ } else {
+ if (!_isEmptyAllowed) {
+ LOG.error("partition assignments are empty");
+ return false;
+ }
+ }
+
+ // check balance in masters per node
+ if (masterCount.size() > 0) {
+ boolean masterMapDividesEvenly = masterSum % masterCount.size() == 0;
+ boolean withinAverage = withinAverage(masterCount, _isEmptyAllowed, masterMapDividesEvenly);
+ if (!withinAverage) {
+ LOG.error("master counts deviate from average");
+ return false;
+ }
+ } else {
+ if (!_isEmptyAllowed) {
+ LOG.error("master assignments are empty");
+ return false;
+ }
+ }
+
+ // check balance in slaves per node
+ if (slaveCount.size() > 0) {
+ boolean slaveMapDividesEvenly = slaveSum % slaveCount.size() == 0;
+ boolean withinAverage = withinAverage(slaveCount, true, slaveMapDividesEvenly);
+ if (!withinAverage) {
+ LOG.error("slave counts deviate from average");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void incrementCount(Map<String, Integer> countMap, String key) {
+ if (!countMap.containsKey(key)) {
+ countMap.put(key, 0);
+ }
+ countMap.put(key, countMap.get(key) + 1);
+ }
+
+ private boolean withinAverage(Map<String, Integer> countMap, boolean isEmptyAllowed,
+ boolean dividesEvenly) {
+ if (countMap.size() == 0) {
+ if (!isEmptyAllowed) {
+ LOG.error("Map not allowed to be empty");
+ return false;
+ }
+ return true;
+ }
+ int upperBound = 1;
+ if (!dividesEvenly) {
+ upperBound = 2;
+ }
+ int average = computeAverage(countMap);
+ for (String participantName : countMap.keySet()) {
+ int count = countMap.get(participantName);
+ if (count < average - 1 || count > average + upperBound) {
+ LOG.error("Count " + count + " for " + participantName + " too far from average of "
+ + average);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private int computeAverage(Map<String, Integer> countMap) {
+ if (countMap.size() == 0) {
+ return -1;
+ }
+ int total = 0;
+ for (int value : countMap.values()) {
+ total += value;
+ }
+ return total / countMap.size();
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return _zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return _clusterName;
+ }
+ }
+}