You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/09 00:42:13 UTC

svn commit: r961968 - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/master/ src/test/java/org/apache/hadoop/hbase/master/

Author: jgray
Date: Thu Jul  8 22:42:13 2010
New Revision: 961968

URL: http://svn.apache.org/viewvc?rev=961968&view=rev
Log:
HBASE-2699  [LoadBalancer-v5] Reimplement load balancing to be a background process and to not use heartbeats

Added:
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
Modified:
    hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java

Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=961968&r1=961967&r2=961968&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Thu Jul  8 22:42:13 2010
@@ -12,6 +12,8 @@ Branch 0.90.0 - Master Rewrite Branch
     HBASE-2696  [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup
                 and refactor
     HBASE-2696  Re-enabled TestZooKeeper.testRegionServerSessionExpired
+    HBASE-2699  [LoadBalancer-v5] Reimplement load balancing to be a
+                background process and to not use heartbeats
 
   NEW FEATURES
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java?rev=961968&r1=961967&r2=961968&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/HServerInfo.java Thu Jul  8 22:42:13 2010
@@ -23,6 +23,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Comparator;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -243,6 +244,17 @@ public class HServerInfo implements Writ
   }
 
   /**
+   * Orders HServerInfos by load then name.  Natural/ascending order.
+   */
+  public static class LoadComparator implements Comparator<HServerInfo> {
+    @Override
+    public int compare(HServerInfo left, HServerInfo right) {
+      int loadCompare = left.getLoad().compareTo(right.getLoad());
+      return loadCompare != 0 ? loadCompare : left.compareTo(right);
+    }
+  }
+
+  /**
    * Utility method that does a find of a servername or a hostandport combination
    * in the passed Set.
    * @param servers Set of server names

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=961968&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Thu Jul  8 22:42:13 2010
@@ -0,0 +1,585 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+
+/**
+ * Makes decisions about the placement and movement of Regions across
+ * RegionServers.
+ *
+ * Cluster-wide load balancing will occur only when there are no regions in
+ * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
+ *
+ * Inline region placement with {@link #immediateAssignment} can be used when
+ * the Master needs to handle closed regions that it currently does not have
+ * a destination set for.  This can happen during master failover.
+ *
+ * On cluster startup, {@link #bulkAssignment} can be used to determine
+ * locations for all Regions in a cluster.
+ */
+public class LoadBalancer {
+  private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
+
+  // Number of seconds between each run of the load balancer
+  private final long balancerPeriod;
+
+  private Random rand;
+
+  /**
+   * Instantiate the load balancer with the specified configuration.
+   *
+   * This sets configuration parameters to be used by the balancing algorithms
+   * and launches a background thread to perform periodic load balancing.
+   * @param conf
+   */
+  public LoadBalancer(Configuration conf) {
+    balancerPeriod = conf.getLong("hbase.balancer.period", 300000);
+    rand = new Random();
+  }
+
+  /**
+   * Generate a global load balancing plan according to the specified map of
+   * server information to the most loaded regions of each server.
+   *
+   * The load balancing invariant is that all servers are within 1 region of the
+   * average number of regions per server.  If the average is an integer number,
+   * all servers will be balanced to the average.  Otherwise, all servers will
+   * have either floor(average) or ceiling(average) regions.
+   *
+   * The algorithm is currently implemented as such:
+   *
+   * <ol>
+   * <li>Determine the two valid numbers of regions each server should have,
+   *     <b>MIN</b>=floor(average) and <b>MAX</b>=ceiling(average).
+   *
+   * <li>Iterate down the most loaded servers, shedding regions from each so
+   *     each server hosts exactly <b>MAX</b> regions.  Stop once you reach a
+   *     server that already has &lt;= <b>MAX</b> regions.
+   *
+   * <li>Iterate down the least loaded servers, assigning regions so each server
+   *     has exactly </b>MIN</b> regions.  Stop once you reach a server that
+   *     already has &gt;= <b>MIN</b> regions.
+   *
+   *     Regions being assigned to underloaded servers are those that were shed
+   *     in the previous step.  It is possible that there were not enough
+   *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
+   *     end up with a number of regions required to do so, <b>neededRegions</b>.
+   *
+   *     It is also possible that we were able fill each underloaded but ended
+   *     up with regions that were unassigned from overloaded servers but that
+   *     still do not have assignment.
+   *
+   *     If neither of these conditions hold (no regions needed to fill the
+   *     underloaded servers, no regions leftover from overloaded servers),
+   *     we are done and return.  Otherwise we handle these cases below.
+   *
+   * <li>If <b>neededRegions</b> is non-zero (still have underloaded servers),
+   *     we iterate the most loaded servers again, shedding a single server from
+   *     each (this brings them from having <b>MAX</b> regions to having
+   *     <b>MIN</b> regions).
+   *
+   * <li>We now definitely have more regions that need assignment, either from
+   *     the previous step or from the original shedding from overloaded servers.
+   *
+   *     Iterate the least loaded servers filling each to <b>MIN</b>.
+   *
+   * <li>If we still have more regions that need assignment, again iterate the
+   *     least loaded servers, this time giving each one (filling them to
+   *     </b>MAX</b>) until we run out.
+   *
+   * <li>All servers will now either host <b>MIN</b> or <b>MAX</b> regions.
+   *
+   *     In addition, any server hosting &gt;= <b>MAX</b> regions is guaranteed
+   *     to end up with <b>MAX</b> regions at the end of the balancing.  This
+   *     ensures the minimal number of regions possible are moved.
+   * </ol>
+   *
+   * TODO: We can at-most reassign the number of regions away from a particular
+   *       server to be how many they report as most loaded.
+   *       Should we just keep all assignment in memory?  Any objections?
+   *       Does this mean we need HeapSize on HMaster?  Or just careful monitor?
+   *       (current thinking is we will hold all assignments in memory)
+   *
+   * @param serverInfo map of regionservers and their load/region information to
+   *                   a list of their most loaded regions
+   * @return a list of regions to be moved, including source and destination,
+   *         or null if cluster is already balanced
+   */
+  public List<RegionPlan> balanceCluster(
+      Map<HServerInfo,List<HRegionInfo>> clusterState) {
+    LOG.debug("Running load balancer");
+
+    long startTime = System.currentTimeMillis();
+
+    // Make a map sorted by load and count regions
+    TreeMap<HServerInfo,List<HRegionInfo>> serversByLoad =
+      new TreeMap<HServerInfo,List<HRegionInfo>>(
+          new HServerInfo.LoadComparator());
+    int numServers = clusterState.size();
+    int numRegions = 0;
+    // Iterate so we can count regions as we build the map
+    for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+      clusterState.entrySet()) {
+      numRegions += server.getKey().getLoad().getNumberOfRegions();
+      serversByLoad.put(server.getKey(), server.getValue());
+    }
+
+    // Check if we even need to do any load balancing
+    float average = (float)numRegions / numServers; // for logging
+    int min = numRegions / numServers;
+    int max = numRegions % numServers == 0 ? min : min + 1;
+    if(serversByLoad.lastKey().getLoad().getNumberOfRegions() <= max &&
+       serversByLoad.firstKey().getLoad().getNumberOfRegions() >= min) {
+      // Skipped because no server outside (min,max) range
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Skipping load balancing.  servers=" + numServers + " " +
+            "regions=" + numRegions + " average=" + average + " " +
+            "mostloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions() +
+            " leastloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions());
+      }
+      return null;
+    }
+
+    // Balance the cluster
+    // TODO: Look at data block locality or a more complex load to do this
+    List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
+    int regionidx = 0; // track the index in above list for setting destination
+
+    // Walk down most loaded, pruning each to the max
+    int serversOverloaded = 0;
+    Map<HServerInfo,BalanceInfo> serverBalanceInfo =
+      new TreeMap<HServerInfo,BalanceInfo>();
+    for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+      serversByLoad.descendingMap().entrySet()) {
+      HServerInfo serverInfo = server.getKey();
+      int regionCount = serverInfo.getLoad().getNumberOfRegions();
+      if(regionCount <= max) {
+        serverBalanceInfo.put(serverInfo, new BalanceInfo(0, 0));
+        break;
+      }
+      serversOverloaded++;
+      List<HRegionInfo> regions = server.getValue();
+      int numToOffload = Math.min(regionCount - max, regions.size());
+      for(int i=0; i<numToOffload; i++) {
+        regionsToMove.add(new RegionPlan(regions.get(i), serverInfo, null));
+      }
+      serverBalanceInfo.put(serverInfo,
+          new BalanceInfo(numToOffload, (-1)*numToOffload));
+    }
+
+    // Walk down least loaded, filling each to the min
+    int serversUnderloaded = 0; // number of servers that get new regions
+    int neededRegions = 0; // number of regions needed to bring all up to min
+    for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+      serversByLoad.entrySet()) {
+      int regionCount = server.getKey().getLoad().getNumberOfRegions();
+      if(regionCount >= min) {
+        break;
+      }
+      serversUnderloaded++;
+      int numToTake = min - regionCount;
+      int numTaken = 0;
+      while(numTaken < numToTake && regionidx < regionsToMove.size()) {
+        regionsToMove.get(regionidx).setDestination(server.getKey());
+        numTaken++;
+        regionidx++;
+      }
+      serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
+      // If we still want to take some, increment needed
+      if(numTaken < numToTake) {
+        neededRegions += (numToTake - numTaken);
+      }
+    }
+
+    // If none needed to fill all to min and none left to drain all to max,
+    // we are done
+    if(neededRegions == 0 && regionidx == regionsToMove.size()) {
+      long endTime = System.currentTimeMillis();
+      LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+          "Moving " + regionsToMove.size() + " regions off of " +
+          serversOverloaded + " overloaded servers onto " +
+          serversUnderloaded + " less loaded servers");
+      return regionsToMove;
+    }
+
+    // Need to do a second pass.
+    // Either more regions to assign out or servers that are still underloaded
+
+    // If we need more to fill min, grab one from each most loaded until enough
+    if(neededRegions != 0) {
+      // Walk down most loaded, grabbing one from each until we get enough
+      for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+        serversByLoad.descendingMap().entrySet()) {
+        BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+        int idx =
+          balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
+        HRegionInfo region = server.getValue().get(idx);
+        regionsToMove.add(new RegionPlan(region, server.getKey(), null));
+        if(--neededRegions == 0) {
+          // No more regions needed, done shedding
+          break;
+        }
+      }
+    }
+
+    // Now we have a set of regions that must be all assigned out
+    // Assign each underloaded up to the min, then if leftovers, assign to max
+
+    // Walk down least loaded, assigning to each to fill up to min
+    for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+      serversByLoad.entrySet()) {
+      int regionCount = server.getKey().getLoad().getNumberOfRegions();
+      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+      if(balanceInfo != null) {
+        regionCount += balanceInfo.getNumRegionsAdded();
+      }
+      if(regionCount >= min) {
+        break;
+      }
+      int numToTake = min - regionCount;
+      int numTaken = 0;
+      while(numTaken < numToTake && regionidx < regionsToMove.size()) {
+        regionsToMove.get(regionidx).setDestination(server.getKey());
+        numTaken++;
+        regionidx++;
+      }
+    }
+
+    // If we still have regions to dish out, assign underloaded to max
+    if(regionidx != regionsToMove.size()) {
+      for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+        serversByLoad.entrySet()) {
+        int regionCount = server.getKey().getLoad().getNumberOfRegions();
+        if(regionCount >= max) {
+          break;
+        }
+        regionsToMove.get(regionidx).setDestination(server.getKey());
+        regionidx++;
+        if(regionidx == regionsToMove.size()) {
+          break;
+        }
+      }
+    }
+
+    long endTime = System.currentTimeMillis();
+
+    assert(regionidx == regionsToMove.size());
+    assert(neededRegions == 0);
+
+    // All done!
+    LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
+        "Moving " + regionsToMove.size() + " regions off of " +
+        serversOverloaded + " overloaded servers onto " +
+        serversUnderloaded + " less loaded servers");
+
+    return regionsToMove;
+  }
+
+  /**
+   * Stores additional per-server information about the regions added/removed
+   * during the run of the balancing algorithm.
+   *
+   * For servers that receive additional regions, we are not updating the number
+   * of regions in HServerInfo once we decide to reassign regions to a server,
+   * but we need this information later in the algorithm.  This is stored in
+   * <b>numRegionsAdded</b>.
+   *
+   * For servers that shed regions, we need to track which regions we have
+   * already shed.  <b>nextRegionForUnload</b> contains the index in the list
+   * of regions on the server that is the next to be shed.
+   */
+  private static class BalanceInfo {
+
+    private final int nextRegionForUnload;
+    private final int numRegionsAdded;
+
+    public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
+      this.nextRegionForUnload = nextRegionForUnload;
+      this.numRegionsAdded = numRegionsAdded;
+    }
+
+    public int getNextRegionForUnload() {
+      return nextRegionForUnload;
+    }
+
+    public int getNumRegionsAdded() {
+      return numRegionsAdded;
+    }
+  }
+
+  /**
+   * Generates a bulk assignment plan to be used on cluster startup.
+   *
+   * Takes a list of all the regions and all the servers in the cluster and
+   * returns a map of each server to the regions that it should be assigned.
+   *
+   * Currently implemented as a round-robin assignment.  Same invariant as
+   * load balancing, all servers holding floor(avg) or ceiling(avg).
+   *
+   * TODO: Use block locations from HDFS to place regions with their blocks
+   *
+   * @param regions all regions
+   * @param servers all servers
+   * @return map of server to the regions it should take, or null if no
+   *         assignment is possible (ie. no regions or no servers)
+   */
+  public Map<HServerInfo,List<HRegionInfo>> bulkAssignment(
+      List<HRegionInfo> regions, List<HServerInfo> servers) {
+    if(regions.size() == 0 || servers.size() == 0) {
+      return null;
+    }
+    Map<HServerInfo,List<HRegionInfo>> assignments =
+      new TreeMap<HServerInfo,List<HRegionInfo>>();
+    int numRegions = regions.size();
+    int numServers = servers.size();
+    int max = (int)Math.ceil((float)numRegions/numServers);
+    int serverIdx = 0;
+    for(HServerInfo server : servers) {
+      List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
+      for(int i=serverIdx;i<regions.size();i+=numServers) {
+        serverRegions.add(regions.get(i));
+      }
+      assignments.put(server, serverRegions);
+      serverIdx++;
+    }
+    return assignments;
+  }
+
+  /**
+   * Find the block locations for all of the files for the specified region.
+   *
+   * Returns an ordered list of hosts that are hosting the blocks for this
+   * region.  The weight of each host is the sum of the block lengths of all
+   * files on that host, so the first host in the list is the server which
+   * holds the most bytes of the given region's HFiles.
+   *
+   * TODO: Make this work.  Need to figure out how to match hadoop's hostnames
+   *       given for block locations with our HServerAddress.
+   * TODO: Use the right directory for the region
+   * TODO: Use getFileBlockLocations on the files not the directory
+   *
+   * @param fs the filesystem
+   * @param region region
+   * @return ordered list of hosts holding blocks of the specified region
+   * @throws IOException if any filesystem errors
+   */
+  private List<String> getTopBlockLocations(FileSystem fs, HRegionInfo region)
+  throws IOException {
+    String encodedName = region.getEncodedName();
+    Path path = new Path("/hbase/table/" + encodedName);
+    FileStatus status = fs.getFileStatus(path);
+    BlockLocation [] blockLocations =
+      fs.getFileBlockLocations(status, 0, status.getLen());
+    Map<HostAndWeight,HostAndWeight> hostWeights =
+      new TreeMap<HostAndWeight,HostAndWeight>(new HostAndWeight.HostComparator());
+    for(BlockLocation bl : blockLocations) {
+      String [] hosts = bl.getHosts();
+      long len = bl.getLength();
+      for(String host : hosts) {
+        HostAndWeight haw = hostWeights.get(host);
+        if(haw == null) {
+          haw = new HostAndWeight(host, len);
+          hostWeights.put(haw, haw);
+        } else {
+          haw.addWeight(len);
+        }
+      }
+    }
+    NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
+        new HostAndWeight.WeightComparator());
+    orderedHosts.addAll(hostWeights.values());
+    List<String> topHosts = new ArrayList<String>(orderedHosts.size());
+    for(HostAndWeight haw : orderedHosts.descendingSet()) {
+      topHosts.add(haw.getHost());
+    }
+    return topHosts;
+  }
+
+  /**
+   * Stores the hostname and weight for that hostname.
+   *
+   * This is used when determining the physical locations of the blocks making
+   * up a region.
+   *
+   * To make a prioritized list of the hosts holding the most data of a region,
+   * this class is used to count the total weight for each host.  The weight is
+   * currently just the size of the file.
+   */
+  private static class HostAndWeight {
+
+    private final String host;
+    private long weight;
+
+    public HostAndWeight(String host, long weight) {
+      this.host = host;
+      this.weight = weight;
+    }
+
+    public void addWeight(long weight) {
+      this.weight += weight;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public long getWeight() {
+      return weight;
+    }
+
+    private static class HostComparator implements Comparator<HostAndWeight> {
+      @Override
+      public int compare(HostAndWeight l, HostAndWeight r) {
+        return l.getHost().compareTo(r.getHost());
+      }
+    }
+
+    private static class WeightComparator implements Comparator<HostAndWeight> {
+      @Override
+      public int compare(HostAndWeight l, HostAndWeight r) {
+        if(l.getWeight() == r.getWeight()) {
+          return l.getHost().compareTo(r.getHost());
+        }
+        return l.getWeight() < r.getWeight() ? -1 : 1;
+      }
+    }
+  }
+
+  /**
+   * Generates an immediate assignment plan to be used by a new master for
+   * regions in transition that do not have an already known destination.
+   *
+   * Takes a list of regions that need immediate assignment and a list of
+   * all available servers.  Returns a map of regions to the server they
+   * should be assigned to.
+   *
+   * This method will return quickly and does not do any intelligent
+   * balancing.  The goal is to make a fast decision not the best decision
+   * possible.
+   *
+   * Currently this is random.
+   *
+   * @param regions
+   * @param servers
+   * @return map of regions to the server it should be assigned to
+   */
+  public Map<HRegionInfo,HServerInfo> immediateAssignment(
+      List<HRegionInfo> regions, List<HServerInfo> servers) {
+    Map<HRegionInfo,HServerInfo> assignments =
+      new TreeMap<HRegionInfo,HServerInfo>();
+    for(HRegionInfo region : regions) {
+      assignments.put(region, servers.get(rand.nextInt(servers.size())));
+    }
+    return assignments;
+  }
+
+  /**
+   * Stores the plan for the move of an individual region.
+   *
+   * Contains info for the region being moved, info for the server the region
+   * should be moved from, and info for the server the region should be moved
+   * to.
+   *
+   * The comparable implementation of this class compares only the region
+   * information and not the source/dest server info.
+   */
+  public static class RegionPlan implements Comparable<RegionPlan> {
+
+    private final HRegionInfo region;
+    private final HServerInfo source;
+    private HServerInfo dest;
+
+    /**
+     * Instantiate a plan for a region move, moving the specified region from
+     * the specified source server to the specified destination server.
+     *
+     * Destination server can be instantiated as null and later set
+     * with {@link #setDestination(HServerInfo)}.
+     *
+     * @param region region to be moved
+     * @param source regionserver region should be moved from
+     * @param dest regionserver region should be moved to
+     */
+    public RegionPlan(HRegionInfo region, HServerInfo source, HServerInfo dest) {
+      this.region = region;
+      this.source = source;
+      this.dest = dest;
+    }
+
+    /**
+     * Set the destination server for the plan for this region.
+     */
+    public void setDestination(HServerInfo dest) {
+      this.dest = dest;
+    }
+
+    /**
+     * Get the source server for the plan for this region.
+     * @return server info for source
+     */
+    public HServerInfo getSource() {
+      return source;
+    }
+
+    /**
+     * Get the destination server for the plan for this region.
+     * @return server info for destination
+     */
+    public HServerInfo getDestination() {
+      return dest;
+    }
+
+    /**
+     * Get the region information for the region this plan is for.
+     * @return region info
+     */
+    public HRegionInfo getRegionInfo() {
+      return region;
+    }
+
+    /**
+     * Compare the region info.
+     * @param o region plan you are comparing against
+     */
+    @Override
+    public int compareTo(RegionPlan o) {
+      return getRegionInfo().compareTo(o.getRegionInfo());
+    }
+  }
+}

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java?rev=961968&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java Thu Jul  8 22:42:13 2010
@@ -0,0 +1,384 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestLoadBalancer {
+  private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class);
+
+  private static Configuration conf;
+
+  private static LoadBalancer loadBalancer;
+
+  private static Random rand;
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    conf = HBaseConfiguration.create();
+    loadBalancer = new LoadBalancer(conf);
+    rand = new Random();
+  }
+
+  // int[testnum][servernumber] -> numregions
+  int [][] clusterStateMocks = new int [][] {
+      // 1 node
+      new int [] { 0 },
+      new int [] { 1 },
+      new int [] { 10 },
+      // 2 node
+      new int [] { 0, 0 },
+      new int [] { 2, 0 },
+      new int [] { 2, 1 },
+      new int [] { 2, 2 },
+      new int [] { 2, 3 },
+      new int [] { 2, 4 },
+      new int [] { 1, 1 },
+      new int [] { 0, 1 },
+      new int [] { 10, 1 },
+      new int [] { 14, 1432 },
+      new int [] { 47, 53 },
+      // 3 node
+      new int [] { 0, 1, 2 },
+      new int [] { 1, 2, 3 },
+      new int [] { 0, 2, 2 },
+      new int [] { 0, 3, 0 },
+      new int [] { 0, 4, 0 },
+      new int [] { 20, 20, 0 },
+      // 4 node
+      new int [] { 0, 1, 2, 3 },
+      new int [] { 4, 0, 0, 0 },
+      new int [] { 5, 0, 0, 0 },
+      new int [] { 6, 6, 0, 0 },
+      new int [] { 6, 2, 0, 0 },
+      new int [] { 6, 1, 0, 0 },
+      new int [] { 6, 0, 0, 0 },
+      new int [] { 4, 4, 4, 7 },
+      new int [] { 4, 4, 4, 8 },
+      new int [] { 0, 0, 0, 7 },
+      // 5 node
+      new int [] { 1, 1, 1, 1, 4 },
+      // more nodes
+      new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 },
+      new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 },
+      new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 },
+      new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 },
+      new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 },
+      new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 },
+      new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 },
+      new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 },
+      new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 },
+      new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 },
+      new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 },
+      new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 },
+      new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 },
+      new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }
+  };
+
+  int [][] regionsAndServersMocks = new int [][] {
+      // { num regions, num servers }
+      new int [] { 0, 0 },
+      new int [] { 0, 1 },
+      new int [] { 1, 1 },
+      new int [] { 2, 1 },
+      new int [] { 10, 1 },
+      new int [] { 1, 2 },
+      new int [] { 2, 2 },
+      new int [] { 3, 2 },
+      new int [] { 1, 3 },
+      new int [] { 2, 3 },
+      new int [] { 3, 3 },
+      new int [] { 25, 3 },
+      new int [] { 2, 10 },
+      new int [] { 2, 100 },
+      new int [] { 12, 10 },
+      new int [] { 12, 100 },
+  };
+
+  /**
+   * Test the load balancing algorithm.
+   *
+   * Invariant is that all servers should be hosting either
+   * floor(average) or ceiling(average)
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testBalanceCluster() throws Exception {
+
+    for(int [] mockCluster : clusterStateMocks) {
+      Map<HServerInfo,List<HRegionInfo>> servers = mockClusterServers(mockCluster);
+      LOG.info("Mock Cluster : " + printMock(servers) + " " + printStats(servers));
+      List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
+      List<HServerInfo> balancedCluster = reconcile(servers, plans);
+      LOG.info("Mock Balance : " + printMock(balancedCluster));
+      assertClusterAsBalanced(balancedCluster);
+      for(Map.Entry<HServerInfo, List<HRegionInfo>> entry : servers.entrySet()) {
+        returnRegions(entry.getValue());
+        returnServer(entry.getKey());
+      }
+    }
+
+  }
+
+  /**
+   * Invariant is that all servers have between floor(avg) and ceiling(avg)
+   * number of regions.
+   */
+  public void assertClusterAsBalanced(List<HServerInfo> servers) {
+    int numServers = servers.size();
+    int numRegions = 0;
+    int maxRegions = 0;
+    int minRegions = Integer.MAX_VALUE;
+    for(HServerInfo server : servers) {
+      int nr = server.getLoad().getNumberOfRegions();
+      if(nr > maxRegions) {
+        maxRegions = nr;
+      }
+      if(nr < minRegions) {
+        minRegions = nr;
+      }
+      numRegions += nr;
+    }
+    if(maxRegions - minRegions < 2) {
+      // less than 2 between max and min, can't balance
+      return;
+    }
+    int min = numRegions / numServers;
+    int max = numRegions % numServers == 0 ? min : min + 1;
+
+    for(HServerInfo server : servers) {
+      assertTrue(server.getLoad().getNumberOfRegions() <= max);
+      assertTrue(server.getLoad().getNumberOfRegions() >= min);
+    }
+  }
+
+  /**
+   * Tests immediate assignment.
+   *
+   * Invariant is that all regions have an assignment.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testImmediateAssignment() throws Exception {
+    for(int [] mock : regionsAndServersMocks) {
+      LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
+      List<HRegionInfo> regions = randomRegions(mock[0]);
+      List<HServerInfo> servers = randomServers(mock[1], 0);
+      Map<HRegionInfo,HServerInfo> assignments =
+        loadBalancer.immediateAssignment(regions, servers);
+      assertImmediateAssignment(regions, servers, assignments);
+      returnRegions(regions);
+      returnServers(servers);
+    }
+  }
+
+  /**
+   * All regions have an assignment.
+   * @param regions
+   * @param servers
+   * @param assignments
+   */
+  private void assertImmediateAssignment(List<HRegionInfo> regions,
+      List<HServerInfo> servers, Map<HRegionInfo,HServerInfo> assignments) {
+    for(HRegionInfo region : regions) {
+      assertTrue(assignments.containsKey(region));
+    }
+  }
+
+  /**
+   * Tests the bulk assignment used during cluster startup.
+   *
+   * Round-robin.  Should yield a balanced cluster so same invariant as the load
+   * balancer holds, all servers holding either floor(avg) or ceiling(avg).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testBulkAssignment() throws Exception {
+    for(int [] mock : regionsAndServersMocks) {
+      LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
+      List<HRegionInfo> regions = randomRegions(mock[0]);
+      List<HServerInfo> servers = randomServers(mock[1], 0);
+      Map<HServerInfo,List<HRegionInfo>> assignments =
+        loadBalancer.bulkAssignment(regions, servers);
+      float average = (float)regions.size()/servers.size();
+      int min = (int)Math.floor(average);
+      int max = (int)Math.ceil(average);
+      if(assignments != null && !assignments.isEmpty()) {
+        for(List<HRegionInfo> regionList : assignments.values()) {
+          assertTrue(regionList.size() == min || regionList.size() == max);
+        }
+      }
+      returnRegions(regions);
+      returnServers(servers);
+    }
+  }
+
+  private String printStats(Map<HServerInfo, List<HRegionInfo>> servers) {
+    int numServers = servers.size();
+    int totalRegions = 0;
+    for(HServerInfo server : servers.keySet()) {
+      totalRegions += server.getLoad().getNumberOfRegions();
+    }
+    float average = (float)totalRegions / numServers;
+    int max = (int)Math.ceil(average);
+    int min = (int)Math.floor(average);
+    return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]";
+  }
+
+  private String printMock(Map<HServerInfo, List<HRegionInfo>> servers) {
+    return printMock(Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()])));
+  }
+
+  private String printMock(List<HServerInfo> balancedCluster) {
+    SortedSet<HServerInfo> sorted = new TreeSet<HServerInfo>(balancedCluster);
+    HServerInfo [] arr = sorted.toArray(new HServerInfo[sorted.size()]);
+    StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
+    sb.append("{ ");
+    for(int i=0;i<arr.length;i++) {
+      if(i != 0) {
+        sb.append(" , ");
+      }
+      sb.append(arr[i].getLoad().getNumberOfRegions());
+    }
+    sb.append(" }");
+    return sb.toString();
+  }
+
+  /**
+   * This assumes the RegionPlan HSI instances are the same ones in the map, so
+   * actually no need to even pass in the map, but I think it's clearer.
+   * @param servers
+   * @param plans
+   * @return
+   */
+  private List<HServerInfo> reconcile(
+      Map<HServerInfo, List<HRegionInfo>> servers, List<RegionPlan> plans) {
+    if(plans != null) {
+      for(RegionPlan plan : plans) {
+        plan.getSource().getLoad().setNumberOfRegions(
+            plan.getSource().getLoad().getNumberOfRegions() - 1);
+        plan.getDestination().getLoad().setNumberOfRegions(
+            plan.getDestination().getLoad().getNumberOfRegions() + 1);
+      }
+    }
+    return Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()]));
+  }
+
+  private Map<HServerInfo, List<HRegionInfo>> mockClusterServers(
+      int [] mockCluster) {
+    int numServers = mockCluster.length;
+    Map<HServerInfo,List<HRegionInfo>> servers =
+      new TreeMap<HServerInfo,List<HRegionInfo>>();
+    for(int i=0;i<numServers;i++) {
+      int numRegions = mockCluster[i];
+      HServerInfo server = randomServer(numRegions);
+      List<HRegionInfo> regions = randomRegions(numRegions);
+      servers.put(server, regions);
+    }
+    return servers;
+  }
+
+  private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
+
+  private List<HRegionInfo> randomRegions(int numRegions) {
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+    byte [] start = new byte[16];
+    byte [] end = new byte[16];
+    rand.nextBytes(start);
+    rand.nextBytes(end);
+    for(int i=0;i<numRegions;i++) {
+      if(!regionQueue.isEmpty()) {
+        regions.add(regionQueue.poll());
+        continue;
+      }
+      Bytes.putInt(start, 0, numRegions << 1);
+      Bytes.putInt(end, 0, (numRegions << 1) + 1);
+      HRegionInfo hri = new HRegionInfo(
+          new HTableDescriptor(Bytes.toBytes("table")), start, end);
+      regions.add(hri);
+    }
+    return regions;
+  }
+
+  private void returnRegions(List<HRegionInfo> regions) {
+    regionQueue.addAll(regions);
+  }
+
+  private Queue<HServerInfo> serverQueue = new LinkedList<HServerInfo>();
+
+  private HServerInfo randomServer(int numRegions) {
+    if(!serverQueue.isEmpty()) {
+      HServerInfo server = this.serverQueue.poll();
+      server.getLoad().setNumberOfRegions(numRegions);
+      return server;
+    }
+    String host = RandomStringUtils.random(16);
+    int port = rand.nextInt(60000);
+    long startCode = rand.nextLong();
+    HServerInfo hsi =
+      new HServerInfo(new HServerAddress(host, port), startCode, port, host);
+    hsi.getLoad().setNumberOfRegions(numRegions);
+    return hsi;
+  }
+
+  private List<HServerInfo> randomServers(int numServers, int numRegionsPerServer) {
+    List<HServerInfo> servers = new ArrayList<HServerInfo>(numServers);
+    for(int i=0;i<numServers;i++) {
+      servers.add(randomServer(numRegionsPerServer));
+    }
+    return servers;
+  }
+
+  private void returnServer(HServerInfo server) {
+    serverQueue.add(server);
+  }
+
+  private void returnServers(List<HServerInfo> servers) {
+    serverQueue.addAll(servers);
+  }
+}