You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/09 00:42:13 UTC
svn commit: r961968 - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/master/
src/test/java/org/apache/hadoop/hbase/master/
Author: jgray
Date: Thu Jul 8 22:42:13 2010
New Revision: 961968
URL: http://svn.apache.org/viewvc?rev=961968&view=rev
Log:
HBASE-2699 [LoadBalancer-v5] Reimplement load balancing to be a background process and to not use heartbeats
Added:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
Modified:
hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=961968&r1=961967&r2=961968&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Thu Jul 8 22:42:13 2010
@@ -12,6 +12,8 @@ Branch 0.90.0 - Master Rewrite Branch
HBASE-2696 [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup
and refactor
HBASE-2696 Re-enabled TestZooKeeper.testRegionServerSessionExpired
+ HBASE-2699 [LoadBalancer-v5] Reimplement load balancing to be a
+ background process and to not use heartbeats
NEW FEATURES
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java?rev=961968&r1=961967&r2=961968&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java Thu Jul 8 22:42:13 2010
@@ -23,6 +23,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Comparator;
import java.util.Set;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -243,6 +244,17 @@ public class HServerInfo implements Writ
}
/**
+ * Orders HServerInfos by load then name. Natural/ascending order.
+ */
+ public static class LoadComparator implements Comparator<HServerInfo> {
+ @Override
+ public int compare(HServerInfo left, HServerInfo right) {
+ int loadCompare = left.getLoad().compareTo(right.getLoad());
+ return loadCompare != 0 ? loadCompare : left.compareTo(right);
+ }
+ }
+
+ /**
* Utility method that does a find of a servername or a hostandport combination
* in the passed Set.
* @param servers Set of server names
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=961968&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Thu Jul 8 22:42:13 2010
@@ -0,0 +1,585 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+
+/**
+ * Makes decisions about the placement and movement of Regions across
+ * RegionServers.
+ *
+ * Cluster-wide load balancing will occur only when there are no regions in
+ * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
+ *
+ * Inline region placement with {@link #immediateAssignment} can be used when
+ * the Master needs to handle closed regions that it currently does not have
+ * a destination set for. This can happen during master failover.
+ *
+ * On cluster startup, {@link #bulkAssignment} can be used to determine
+ * locations for all Regions in a cluster.
+ */
+public class LoadBalancer {
+ private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
+
+ // Number of seconds between each run of the load balancer
+ private final long balancerPeriod;
+
+ private Random rand;
+
+ /**
+ * Instantiate the load balancer with the specified configuration.
+ *
+ * This sets configuration parameters to be used by the balancing algorithms
+ * and launches a background thread to perform periodic load balancing.
+ * @param conf
+ */
+ public LoadBalancer(Configuration conf) {
+ balancerPeriod = conf.getLong("hbase.balancer.period", 300000);
+ rand = new Random();
+ }
+
+ /**
+ * Generate a global load balancing plan according to the specified map of
+ * server information to the most loaded regions of each server.
+ *
+ * The load balancing invariant is that all servers are within 1 region of the
+ * average number of regions per server. If the average is an integer number,
+ * all servers will be balanced to the average. Otherwise, all servers will
+ * have either floor(average) or ceiling(average) regions.
+ *
+ * The algorithm is currently implemented as such:
+ *
+ * <ol>
+ * <li>Determine the two valid numbers of regions each server should have,
+ * <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
+ *
+ * <li>Iterate down the most loaded servers, shedding regions from each so
+ * each server hosts exactly <b>MAX</b> regions. Stop once you reach a
+ * server that already has <= <b>MAX</b> regions.
+ *
+ * <li>Iterate down the least loaded servers, assigning regions so each server
+ * has exactly </b>MIN</b> regions. Stop once you reach a server that
+ * already has >= <b>MIN</b> regions.
+ *
+ * Regions being assigned to underloaded servers are those that were shed
+ * in the previous step. It is possible that there were not enough
+ * regions shed to fill each underloaded server to <b>MIN</b>. If so we
+ * end up with a number of regions required to do so, <b>neededRegions</b>.
+ *
+ * It is also possible that we were able fill each underloaded but ended
+ * up with regions that were unassigned from overloaded servers but that
+ * still do not have assignment.
+ *
+ * If neither of these conditions hold (no regions needed to fill the
+ * underloaded servers, no regions leftover from overloaded servers),
+ * we are done and return. Otherwise we handle these cases below.
+ *
+ * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
+ * we iterate the most loaded servers again, shedding a single server from
+ * each (this brings them from having <b>MAX</b> regions to having
+ * <b>MIN</b> regions).
+ *
+ * <li>We now definitely have more regions that need assignment, either from
+ * the previous step or from the original shedding from overloaded servers.
+ *
+ * Iterate the least loaded servers filling each to <b>MIN</b>.
+ *
+ * <li>If we still have more regions that need assignment, again iterate the
+ * least loaded servers, this time giving each one (filling them to
+ * </b>MAX</b>) until we run out.
+ *
+ * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
+ *
+ * In addition, any server hosting >= <b>MAX</b> regions is guaranteed
+ * to end up with <b>MAX</b> regions at the end of the balancing. This
+ * ensures the minimal number of regions possible are moved.
+ * </ol>
+ *
+ * TODO: We can at-most reassign the number of regions away from a particular
+ * server to be how many they report as most loaded.
+ * Should we just keep all assignment in memory? Any objections?
+ * Does this mean we need HeapSize on HMaster? Or just careful monitor?
+ * (current thinking is we will hold all assignments in memory)
+ *
+ * @param serverInfo map of regionservers and their load/region information to
+ * a list of their most loaded regions
+ * @return a list of regions to be moved, including source and destination,
+ * or null if cluster is already balanced
+ */
+ public List<RegionPlan> balanceCluster(
+ Map<HServerInfo,List<HRegionInfo>> clusterState) {
+ LOG.debug("Running load balancer");
+
+ long startTime = System.currentTimeMillis();
+
+ // Make a map sorted by load and count regions
+ TreeMap<HServerInfo,List<HRegionInfo>> serversByLoad =
+ new TreeMap<HServerInfo,List<HRegionInfo>>(
+ new HServerInfo.LoadComparator());
+ int numServers = clusterState.size();
+ int numRegions = 0;
+ // Iterate so we can count regions as we build the map
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ clusterState.entrySet()) {
+ numRegions += server.getKey().getLoad().getNumberOfRegions();
+ serversByLoad.put(server.getKey(), server.getValue());
+ }
+
+ // Check if we even need to do any load balancing
+ float average = (float)numRegions / numServers; // for logging
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+ if(serversByLoad.lastKey().getLoad().getNumberOfRegions() <= max &&
+ serversByLoad.firstKey().getLoad().getNumberOfRegions() >= min) {
+ // Skipped because no server outside (min,max) range
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Skipping load balancing. servers=" + numServers + " " +
+ "regions=" + numRegions + " average=" + average + " " +
+ "mostloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions() +
+ " leastloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions());
+ }
+ return null;
+ }
+
+ // Balance the cluster
+ // TODO: Look at data block locality or a more complex load to do this
+ List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
+ int regionidx = 0; // track the index in above list for setting destination
+
+ // Walk down most loaded, pruning each to the max
+ int serversOverloaded = 0;
+ Map<HServerInfo,BalanceInfo> serverBalanceInfo =
+ new TreeMap<HServerInfo,BalanceInfo>();
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ serversByLoad.descendingMap().entrySet()) {
+ HServerInfo serverInfo = server.getKey();
+ int regionCount = serverInfo.getLoad().getNumberOfRegions();
+ if(regionCount <= max) {
+ serverBalanceInfo.put(serverInfo, new BalanceInfo(0, 0));
+ break;
+ }
+ serversOverloaded++;
+ List<HRegionInfo> regions = server.getValue();
+ int numToOffload = Math.min(regionCount - max, regions.size());
+ for(int i=0; i<numToOffload; i++) {
+ regionsToMove.add(new RegionPlan(regions.get(i), serverInfo, null));
+ }
+ serverBalanceInfo.put(serverInfo,
+ new BalanceInfo(numToOffload, (-1)*numToOffload));
+ }
+
+ // Walk down least loaded, filling each to the min
+ int serversUnderloaded = 0; // number of servers that get new regions
+ int neededRegions = 0; // number of regions needed to bring all up to min
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad().getNumberOfRegions();
+ if(regionCount >= min) {
+ break;
+ }
+ serversUnderloaded++;
+ int numToTake = min - regionCount;
+ int numTaken = 0;
+ while(numTaken < numToTake && regionidx < regionsToMove.size()) {
+ regionsToMove.get(regionidx).setDestination(server.getKey());
+ numTaken++;
+ regionidx++;
+ }
+ serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
+ // If we still want to take some, increment needed
+ if(numTaken < numToTake) {
+ neededRegions += (numToTake - numTaken);
+ }
+ }
+
+ // If none needed to fill all to min and none left to drain all to max,
+ // we are done
+ if(neededRegions == 0 && regionidx == regionsToMove.size()) {
+ long endTime = System.currentTimeMillis();
+ LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + regionsToMove.size() + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+ return regionsToMove;
+ }
+
+ // Need to do a second pass.
+ // Either more regions to assign out or servers that are still underloaded
+
+ // If we need more to fill min, grab one from each most loaded until enough
+ if(neededRegions != 0) {
+ // Walk down most loaded, grabbing one from each until we get enough
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ serversByLoad.descendingMap().entrySet()) {
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+ int idx =
+ balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
+ HRegionInfo region = server.getValue().get(idx);
+ regionsToMove.add(new RegionPlan(region, server.getKey(), null));
+ if(--neededRegions == 0) {
+ // No more regions needed, done shedding
+ break;
+ }
+ }
+ }
+
+ // Now we have a set of regions that must be all assigned out
+ // Assign each underloaded up to the min, then if leftovers, assign to max
+
+ // Walk down least loaded, assigning to each to fill up to min
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad().getNumberOfRegions();
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+ if(balanceInfo != null) {
+ regionCount += balanceInfo.getNumRegionsAdded();
+ }
+ if(regionCount >= min) {
+ break;
+ }
+ int numToTake = min - regionCount;
+ int numTaken = 0;
+ while(numTaken < numToTake && regionidx < regionsToMove.size()) {
+ regionsToMove.get(regionidx).setDestination(server.getKey());
+ numTaken++;
+ regionidx++;
+ }
+ }
+
+ // If we still have regions to dish out, assign underloaded to max
+ if(regionidx != regionsToMove.size()) {
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad().getNumberOfRegions();
+ if(regionCount >= max) {
+ break;
+ }
+ regionsToMove.get(regionidx).setDestination(server.getKey());
+ regionidx++;
+ if(regionidx == regionsToMove.size()) {
+ break;
+ }
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ assert(regionidx == regionsToMove.size());
+ assert(neededRegions == 0);
+
+ // All done!
+ LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + regionsToMove.size() + " regions off of " +
+ serversOverloaded + " overloaded servers onto " +
+ serversUnderloaded + " less loaded servers");
+
+ return regionsToMove;
+ }
+
+ /**
+ * Stores additional per-server information about the regions added/removed
+ * during the run of the balancing algorithm.
+ *
+ * For servers that receive additional regions, we are not updating the number
+ * of regions in HServerInfo once we decide to reassign regions to a server,
+ * but we need this information later in the algorithm. This is stored in
+ * <b>numRegionsAdded</b>.
+ *
+ * For servers that shed regions, we need to track which regions we have
+ * already shed. <b>nextRegionForUnload</b> contains the index in the list
+ * of regions on the server that is the next to be shed.
+ */
+ private static class BalanceInfo {
+
+ private final int nextRegionForUnload;
+ private final int numRegionsAdded;
+
+ public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+ this.nextRegionForUnload = nextRegionForUnload;
+ this.numRegionsAdded = numRegionsAdded;
+ }
+
+ public int getNextRegionForUnload() {
+ return nextRegionForUnload;
+ }
+
+ public int getNumRegionsAdded() {
+ return numRegionsAdded;
+ }
+ }
+
+ /**
+ * Generates a bulk assignment plan to be used on cluster startup.
+ *
+ * Takes a list of all the regions and all the servers in the cluster and
+ * returns a map of each server to the regions that it should be assigned.
+ *
+ * Currently implemented as a round-robin assignment. Same invariant as
+ * load balancing, all servers holding floor(avg) or ceiling(avg).
+ *
+ * TODO: Use block locations from HDFS to place regions with their blocks
+ *
+ * @param regions all regions
+ * @param servers all servers
+ * @return map of server to the regions it should take, or null if no
+ * assignment is possible (ie. no regions or no servers)
+ */
+ public Map<HServerInfo,List<HRegionInfo>> bulkAssignment(
+ List<HRegionInfo> regions, List<HServerInfo> servers) {
+ if(regions.size() == 0 || servers.size() == 0) {
+ return null;
+ }
+ Map<HServerInfo,List<HRegionInfo>> assignments =
+ new TreeMap<HServerInfo,List<HRegionInfo>>();
+ int numRegions = regions.size();
+ int numServers = servers.size();
+ int max = (int)Math.ceil((float)numRegions/numServers);
+ int serverIdx = 0;
+ for(HServerInfo server : servers) {
+ List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
+ for(int i=serverIdx;i<regions.size();i+=numServers) {
+ serverRegions.add(regions.get(i));
+ }
+ assignments.put(server, serverRegions);
+ serverIdx++;
+ }
+ return assignments;
+ }
+
+ /**
+ * Find the block locations for all of the files for the specified region.
+ *
+ * Returns an ordered list of hosts that are hosting the blocks for this
+ * region. The weight of each host is the sum of the block lengths of all
+ * files on that host, so the first host in the list is the server which
+ * holds the most bytes of the given region's HFiles.
+ *
+ * TODO: Make this work. Need to figure out how to match hadoop's hostnames
+ * given for block locations with our HServerAddress.
+ * TODO: Use the right directory for the region
+ * TODO: Use getFileBlockLocations on the files not the directory
+ *
+ * @param fs the filesystem
+ * @param region region
+ * @return ordered list of hosts holding blocks of the specified region
+ * @throws IOException if any filesystem errors
+ */
+ private List<String> getTopBlockLocations(FileSystem fs, HRegionInfo region)
+ throws IOException {
+ String encodedName = region.getEncodedName();
+ Path path = new Path("/hbase/table/" + encodedName);
+ FileStatus status = fs.getFileStatus(path);
+ BlockLocation [] blockLocations =
+ fs.getFileBlockLocations(status, 0, status.getLen());
+ Map<HostAndWeight,HostAndWeight> hostWeights =
+ new TreeMap<HostAndWeight,HostAndWeight>(new HostAndWeight.HostComparator());
+ for(BlockLocation bl : blockLocations) {
+ String [] hosts = bl.getHosts();
+ long len = bl.getLength();
+ for(String host : hosts) {
+ HostAndWeight haw = hostWeights.get(host);
+ if(haw == null) {
+ haw = new HostAndWeight(host, len);
+ hostWeights.put(haw, haw);
+ } else {
+ haw.addWeight(len);
+ }
+ }
+ }
+ NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
+ new HostAndWeight.WeightComparator());
+ orderedHosts.addAll(hostWeights.values());
+ List<String> topHosts = new ArrayList<String>(orderedHosts.size());
+ for(HostAndWeight haw : orderedHosts.descendingSet()) {
+ topHosts.add(haw.getHost());
+ }
+ return topHosts;
+ }
+
+ /**
+ * Stores the hostname and weight for that hostname.
+ *
+ * This is used when determining the physical locations of the blocks making
+ * up a region.
+ *
+ * To make a prioritized list of the hosts holding the most data of a region,
+ * this class is used to count the total weight for each host. The weight is
+ * currently just the size of the file.
+ */
+ private static class HostAndWeight {
+
+ private final String host;
+ private long weight;
+
+ public HostAndWeight(String host, long weight) {
+ this.host = host;
+ this.weight = weight;
+ }
+
+ public void addWeight(long weight) {
+ this.weight += weight;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public long getWeight() {
+ return weight;
+ }
+
+ private static class HostComparator implements Comparator<HostAndWeight> {
+ @Override
+ public int compare(HostAndWeight l, HostAndWeight r) {
+ return l.getHost().compareTo(r.getHost());
+ }
+ }
+
+ private static class WeightComparator implements Comparator<HostAndWeight> {
+ @Override
+ public int compare(HostAndWeight l, HostAndWeight r) {
+ if(l.getWeight() == r.getWeight()) {
+ return l.getHost().compareTo(r.getHost());
+ }
+ return l.getWeight() < r.getWeight() ? -1 : 1;
+ }
+ }
+ }
+
+ /**
+ * Generates an immediate assignment plan to be used by a new master for
+ * regions in transition that do not have an already known destination.
+ *
+ * Takes a list of regions that need immediate assignment and a list of
+ * all available servers. Returns a map of regions to the server they
+ * should be assigned to.
+ *
+ * This method will return quickly and does not do any intelligent
+ * balancing. The goal is to make a fast decision not the best decision
+ * possible.
+ *
+ * Currently this is random.
+ *
+ * @param regions
+ * @param servers
+ * @return map of regions to the server it should be assigned to
+ */
+ public Map<HRegionInfo,HServerInfo> immediateAssignment(
+ List<HRegionInfo> regions, List<HServerInfo> servers) {
+ Map<HRegionInfo,HServerInfo> assignments =
+ new TreeMap<HRegionInfo,HServerInfo>();
+ for(HRegionInfo region : regions) {
+ assignments.put(region, servers.get(rand.nextInt(servers.size())));
+ }
+ return assignments;
+ }
+
+ /**
+ * Stores the plan for the move of an individual region.
+ *
+ * Contains info for the region being moved, info for the server the region
+ * should be moved from, and info for the server the region should be moved
+ * to.
+ *
+ * The comparable implementation of this class compares only the region
+ * information and not the source/dest server info.
+ */
+ public static class RegionPlan implements Comparable<RegionPlan> {
+
+ private final HRegionInfo region;
+ private final HServerInfo source;
+ private HServerInfo dest;
+
+ /**
+ * Instantiate a plan for a region move, moving the specified region from
+ * the specified source server to the specified destination server.
+ *
+ * Destination server can be instantiated as null and later set
+ * with {@link #setDestination(HServerInfo)}.
+ *
+ * @param region region to be moved
+ * @param source regionserver region should be moved from
+ * @param dest regionserver region should be moved to
+ */
+ public RegionPlan(HRegionInfo region, HServerInfo source, HServerInfo dest) {
+ this.region = region;
+ this.source = source;
+ this.dest = dest;
+ }
+
+ /**
+ * Set the destination server for the plan for this region.
+ */
+ public void setDestination(HServerInfo dest) {
+ this.dest = dest;
+ }
+
+ /**
+ * Get the source server for the plan for this region.
+ * @return server info for source
+ */
+ public HServerInfo getSource() {
+ return source;
+ }
+
+ /**
+ * Get the destination server for the plan for this region.
+ * @return server info for destination
+ */
+ public HServerInfo getDestination() {
+ return dest;
+ }
+
+ /**
+ * Get the region information for the region this plan is for.
+ * @return region info
+ */
+ public HRegionInfo getRegionInfo() {
+ return region;
+ }
+
+ /**
+ * Compare the region info.
+ * @param o region plan you are comparing against
+ */
+ @Override
+ public int compareTo(RegionPlan o) {
+ return getRegionInfo().compareTo(o.getRegionInfo());
+ }
+ }
+}
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java?rev=961968&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java Thu Jul 8 22:42:13 2010
@@ -0,0 +1,384 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestLoadBalancer {
+ private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class);
+
+ private static Configuration conf;
+
+ private static LoadBalancer loadBalancer;
+
+ private static Random rand;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ conf = HBaseConfiguration.create();
+ loadBalancer = new LoadBalancer(conf);
+ rand = new Random();
+ }
+
+ // int[testnum][servernumber] -> numregions
+ int [][] clusterStateMocks = new int [][] {
+ // 1 node
+ new int [] { 0 },
+ new int [] { 1 },
+ new int [] { 10 },
+ // 2 node
+ new int [] { 0, 0 },
+ new int [] { 2, 0 },
+ new int [] { 2, 1 },
+ new int [] { 2, 2 },
+ new int [] { 2, 3 },
+ new int [] { 2, 4 },
+ new int [] { 1, 1 },
+ new int [] { 0, 1 },
+ new int [] { 10, 1 },
+ new int [] { 14, 1432 },
+ new int [] { 47, 53 },
+ // 3 node
+ new int [] { 0, 1, 2 },
+ new int [] { 1, 2, 3 },
+ new int [] { 0, 2, 2 },
+ new int [] { 0, 3, 0 },
+ new int [] { 0, 4, 0 },
+ new int [] { 20, 20, 0 },
+ // 4 node
+ new int [] { 0, 1, 2, 3 },
+ new int [] { 4, 0, 0, 0 },
+ new int [] { 5, 0, 0, 0 },
+ new int [] { 6, 6, 0, 0 },
+ new int [] { 6, 2, 0, 0 },
+ new int [] { 6, 1, 0, 0 },
+ new int [] { 6, 0, 0, 0 },
+ new int [] { 4, 4, 4, 7 },
+ new int [] { 4, 4, 4, 8 },
+ new int [] { 0, 0, 0, 7 },
+ // 5 node
+ new int [] { 1, 1, 1, 1, 4 },
+ // more nodes
+ new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 },
+ new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 },
+ new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 },
+ new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 },
+ new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 },
+ new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 },
+ new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 },
+ new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 },
+ new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 },
+ new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 },
+ new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 },
+ new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 },
+ new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 },
+ new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }
+ };
+
+ int [][] regionsAndServersMocks = new int [][] {
+ // { num regions, num servers }
+ new int [] { 0, 0 },
+ new int [] { 0, 1 },
+ new int [] { 1, 1 },
+ new int [] { 2, 1 },
+ new int [] { 10, 1 },
+ new int [] { 1, 2 },
+ new int [] { 2, 2 },
+ new int [] { 3, 2 },
+ new int [] { 1, 3 },
+ new int [] { 2, 3 },
+ new int [] { 3, 3 },
+ new int [] { 25, 3 },
+ new int [] { 2, 10 },
+ new int [] { 2, 100 },
+ new int [] { 12, 10 },
+ new int [] { 12, 100 },
+ };
+
+ /**
+ * Test the load balancing algorithm.
+ *
+ * Invariant is that all servers should be hosting either
+ * floor(average) or ceiling(average)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBalanceCluster() throws Exception {
+
+ for(int [] mockCluster : clusterStateMocks) {
+ Map<HServerInfo,List<HRegionInfo>> servers = mockClusterServers(mockCluster);
+ LOG.info("Mock Cluster : " + printMock(servers) + " " + printStats(servers));
+ List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
+ List<HServerInfo> balancedCluster = reconcile(servers, plans);
+ LOG.info("Mock Balance : " + printMock(balancedCluster));
+ assertClusterAsBalanced(balancedCluster);
+ for(Map.Entry<HServerInfo, List<HRegionInfo>> entry : servers.entrySet()) {
+ returnRegions(entry.getValue());
+ returnServer(entry.getKey());
+ }
+ }
+
+ }
+
+ /**
+ * Invariant is that all servers have between floor(avg) and ceiling(avg)
+ * number of regions.
+ */
+ public void assertClusterAsBalanced(List<HServerInfo> servers) {
+ int numServers = servers.size();
+ int numRegions = 0;
+ int maxRegions = 0;
+ int minRegions = Integer.MAX_VALUE;
+ for(HServerInfo server : servers) {
+ int nr = server.getLoad().getNumberOfRegions();
+ if(nr > maxRegions) {
+ maxRegions = nr;
+ }
+ if(nr < minRegions) {
+ minRegions = nr;
+ }
+ numRegions += nr;
+ }
+ if(maxRegions - minRegions < 2) {
+ // less than 2 between max and min, can't balance
+ return;
+ }
+ int min = numRegions / numServers;
+ int max = numRegions % numServers == 0 ? min : min + 1;
+
+ for(HServerInfo server : servers) {
+ assertTrue(server.getLoad().getNumberOfRegions() <= max);
+ assertTrue(server.getLoad().getNumberOfRegions() >= min);
+ }
+ }
+
+ /**
+ * Tests immediate assignment.
+ *
+ * Invariant is that all regions have an assignment.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testImmediateAssignment() throws Exception {
+ for(int [] mock : regionsAndServersMocks) {
+ LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
+ List<HRegionInfo> regions = randomRegions(mock[0]);
+ List<HServerInfo> servers = randomServers(mock[1], 0);
+ Map<HRegionInfo,HServerInfo> assignments =
+ loadBalancer.immediateAssignment(regions, servers);
+ assertImmediateAssignment(regions, servers, assignments);
+ returnRegions(regions);
+ returnServers(servers);
+ }
+ }
+
+ /**
+ * All regions have an assignment.
+ * @param regions
+ * @param servers
+ * @param assignments
+ */
+ private void assertImmediateAssignment(List<HRegionInfo> regions,
+ List<HServerInfo> servers, Map<HRegionInfo,HServerInfo> assignments) {
+ for(HRegionInfo region : regions) {
+ assertTrue(assignments.containsKey(region));
+ }
+ }
+
+ /**
+ * Tests the bulk assignment used during cluster startup.
+ *
+ * Round-robin. Should yield a balanced cluster so same invariant as the load
+ * balancer holds, all servers holding either floor(avg) or ceiling(avg).
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBulkAssignment() throws Exception {
+ for(int [] mock : regionsAndServersMocks) {
+ LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
+ List<HRegionInfo> regions = randomRegions(mock[0]);
+ List<HServerInfo> servers = randomServers(mock[1], 0);
+ Map<HServerInfo,List<HRegionInfo>> assignments =
+ loadBalancer.bulkAssignment(regions, servers);
+ float average = (float)regions.size()/servers.size();
+ int min = (int)Math.floor(average);
+ int max = (int)Math.ceil(average);
+ if(assignments != null && !assignments.isEmpty()) {
+ for(List<HRegionInfo> regionList : assignments.values()) {
+ assertTrue(regionList.size() == min || regionList.size() == max);
+ }
+ }
+ returnRegions(regions);
+ returnServers(servers);
+ }
+ }
+
+ private String printStats(Map<HServerInfo, List<HRegionInfo>> servers) {
+ int numServers = servers.size();
+ int totalRegions = 0;
+ for(HServerInfo server : servers.keySet()) {
+ totalRegions += server.getLoad().getNumberOfRegions();
+ }
+ float average = (float)totalRegions / numServers;
+ int max = (int)Math.ceil(average);
+ int min = (int)Math.floor(average);
+ return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]";
+ }
+
+ private String printMock(Map<HServerInfo, List<HRegionInfo>> servers) {
+ return printMock(Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()])));
+ }
+
+ private String printMock(List<HServerInfo> balancedCluster) {
+ SortedSet<HServerInfo> sorted = new TreeSet<HServerInfo>(balancedCluster);
+ HServerInfo [] arr = sorted.toArray(new HServerInfo[sorted.size()]);
+ StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
+ sb.append("{ ");
+ for(int i=0;i<arr.length;i++) {
+ if(i != 0) {
+ sb.append(" , ");
+ }
+ sb.append(arr[i].getLoad().getNumberOfRegions());
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
+
+ /**
+ * This assumes the RegionPlan HSI instances are the same ones in the map, so
+ * actually no need to even pass in the map, but I think it's clearer.
+ * @param servers
+ * @param plans
+ * @return
+ */
+ private List<HServerInfo> reconcile(
+ Map<HServerInfo, List<HRegionInfo>> servers, List<RegionPlan> plans) {
+ if(plans != null) {
+ for(RegionPlan plan : plans) {
+ plan.getSource().getLoad().setNumberOfRegions(
+ plan.getSource().getLoad().getNumberOfRegions() - 1);
+ plan.getDestination().getLoad().setNumberOfRegions(
+ plan.getDestination().getLoad().getNumberOfRegions() + 1);
+ }
+ }
+ return Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()]));
+ }
+
+ private Map<HServerInfo, List<HRegionInfo>> mockClusterServers(
+ int [] mockCluster) {
+ int numServers = mockCluster.length;
+ Map<HServerInfo,List<HRegionInfo>> servers =
+ new TreeMap<HServerInfo,List<HRegionInfo>>();
+ for(int i=0;i<numServers;i++) {
+ int numRegions = mockCluster[i];
+ HServerInfo server = randomServer(numRegions);
+ List<HRegionInfo> regions = randomRegions(numRegions);
+ servers.put(server, regions);
+ }
+ return servers;
+ }
+
+ private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
+
+ private List<HRegionInfo> randomRegions(int numRegions) {
+ List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+ byte [] start = new byte[16];
+ byte [] end = new byte[16];
+ rand.nextBytes(start);
+ rand.nextBytes(end);
+ for(int i=0;i<numRegions;i++) {
+ if(!regionQueue.isEmpty()) {
+ regions.add(regionQueue.poll());
+ continue;
+ }
+ Bytes.putInt(start, 0, numRegions << 1);
+ Bytes.putInt(end, 0, (numRegions << 1) + 1);
+ HRegionInfo hri = new HRegionInfo(
+ new HTableDescriptor(Bytes.toBytes("table")), start, end);
+ regions.add(hri);
+ }
+ return regions;
+ }
+
+ private void returnRegions(List<HRegionInfo> regions) {
+ regionQueue.addAll(regions);
+ }
+
+ private Queue<HServerInfo> serverQueue = new LinkedList<HServerInfo>();
+
+ private HServerInfo randomServer(int numRegions) {
+ if(!serverQueue.isEmpty()) {
+ HServerInfo server = this.serverQueue.poll();
+ server.getLoad().setNumberOfRegions(numRegions);
+ return server;
+ }
+ String host = RandomStringUtils.random(16);
+ int port = rand.nextInt(60000);
+ long startCode = rand.nextLong();
+ HServerInfo hsi =
+ new HServerInfo(new HServerAddress(host, port), startCode, port, host);
+ hsi.getLoad().setNumberOfRegions(numRegions);
+ return hsi;
+ }
+
+ private List<HServerInfo> randomServers(int numServers, int numRegionsPerServer) {
+ List<HServerInfo> servers = new ArrayList<HServerInfo>(numServers);
+ for(int i=0;i<numServers;i++) {
+ servers.add(randomServer(numRegionsPerServer));
+ }
+ return servers;
+ }
+
+ private void returnServer(HServerInfo server) {
+ serverQueue.add(server);
+ }
+
+ private void returnServers(List<HServerInfo> servers) {
+ serverQueue.addAll(servers);
+ }
+}