You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/18 20:12:05 UTC

svn commit: r1094679 - in /hbase/trunk: CHANGES.txt pom.xml src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java

Author: stack
Date: Mon Apr 18 18:12:04 2011
New Revision: 1094679

URL: http://svn.apache.org/viewvc?rev=1094679&view=rev
Log:
HBASE-3609 Improve the selection of regions to balance; part 2 

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/pom.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Apr 18 18:12:04 2011
@@ -172,6 +172,7 @@ Release 0.91.0 - Unreleased
    HBASE-3699  Make RegionServerServices and MasterServices extend Server
                (Erik Onnen)
    HBASE-3757  Upgrade to ZK 3.3.3
+   HBASE-3609  Improve the selection of regions to balance; part 2 (Ted Yu)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Mon Apr 18 18:12:04 2011
@@ -525,7 +525,7 @@
     <commons-lang.version>2.5</commons-lang.version>
     <commons-logging.version>1.1.1</commons-logging.version>
     <commons-math.version>2.1</commons-math.version>
-    <guava.version>r06</guava.version>
+    <guava.version>r09</guava.version>
     <!--The below was made by patching branch-0.20-append
     at revision 1034499 with this hdfs-895 patch:
     https://issues.apache.org/jira/secure/attachment/12459473/hdfs-895-branch-20-append.txt

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Mon Apr 18 18:12:04 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 
+import com.google.common.collect.MinMaxPriorityQueue;
+
 /**
  * Makes decisions about the placement and movement of Regions across
  * RegionServers.
@@ -69,7 +73,26 @@ public class LoadBalancer {
     else if (slop > 1) slop = 1;
   }
   
-  static class RegionPlanComparator implements Comparator<RegionPlan> {
+  /* 
+   * The following comparator assumes that RegionId from HRegionInfo can
+   * represent the age of the region - larger RegionId means the region
+   * is younger.
+   * This comparator is used in balanceCluster() to account for the out-of-band
+   * regions which were assigned to the server after some other region server
+   * crashed.
+   */
+   static class RegionInfoComparator implements Comparator<HRegionInfo> {
+       @Override
+       public int compare(HRegionInfo l, HRegionInfo r) {
+          long diff = r.getRegionId() - l.getRegionId();
+          if (diff < 0) return -1;
+          if (diff > 0) return 1;
+          return 0;
+       } 
+   }
+   static RegionInfoComparator riComparator = new RegionInfoComparator();
+   
+   static class RegionPlanComparator implements Comparator<RegionPlan> {
     @Override
     public int compare(RegionPlan l, RegionPlan r) {
       long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
@@ -89,6 +112,22 @@ public class LoadBalancer {
    * all servers will be balanced to the average.  Otherwise, all servers will
    * have either floor(average) or ceiling(average) regions.
    *
+   * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
+   *   we can fetch from both ends of the queue. 
+   * At the beginning, we check whether there was empty region server 
+   *   just discovered by Master. If so, we alternately choose new / old
+   *   regions from head / tail of regionsToMove, respectively. This alternation
+   *   avoids clustering young regions on the newly discovered region server.
+   *   Otherwise, we choose new regions from head of regionsToMove.
+   *   
+   * Another improvement from HBASE-3609 is that we assign regions from
+   *   regionsToMove to underloaded servers in round-robin fashion.
+   *   Previously one underloaded server would be filled before we move onto
+   *   the next underloaded server, leading to clustering of young regions.
+   *   
+   * Finally, we randomly shuffle underloaded servers so that they receive
+   *   offloaded regions relatively evenly across calls to balanceCluster().
+   *         
    * The algorithm is currently implemented as such:
    *
    * <ol>
@@ -110,7 +149,7 @@ public class LoadBalancer {
    *     regions shed to fill each underloaded server to <b>MIN</b>.  If so we
    *     end up with a number of regions required to do so, <b>neededRegions</b>.
    *
-   *     It is also possible that we were able fill each underloaded but ended
+   *     It is also possible that we were able to fill each underloaded but ended
    *     up with regions that were unassigned from overloaded servers but that
    *     still do not have assignment.
    *
@@ -125,7 +164,6 @@ public class LoadBalancer {
    *
    * <li>We now definitely have more regions that need assignment, either from
    *     the previous step or from the original shedding from overloaded servers.
-   *
    *     Iterate the least loaded servers filling each to <b>MIN</b>.
    *
    * <li>If we still have more regions that need assignment, again iterate the
@@ -152,6 +190,7 @@ public class LoadBalancer {
    */
   public List<RegionPlan> balanceCluster(
       Map<HServerInfo,List<HRegionInfo>> clusterState) {
+    boolean emptyRegionServerPresent = false;
     long startTime = System.currentTimeMillis();
 
     // Make a map sorted by load and count regions
@@ -167,7 +206,9 @@ public class LoadBalancer {
     // Iterate so we can count regions as we build the map
     for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
         clusterState.entrySet()) {
-      server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
+      int sz = server.getValue().size();
+      if (sz == 0) emptyRegionServerPresent = true;
+      server.getKey().getLoad().setNumberOfRegions(sz);
       numRegions += server.getKey().getLoad().getNumberOfRegions();
       serversByLoad.put(server.getKey(), server.getValue());
     }
@@ -191,11 +232,13 @@ public class LoadBalancer {
 
     // Balance the cluster
     // TODO: Look at data block locality or a more complex load to do this
-    List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
-    int regionidx = 0; // track the index in above list for setting destination
+    MinMaxPriorityQueue<RegionPlan> regionsToMove = MinMaxPriorityQueue.orderedBy(rpComparator).create();
+    List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
 
     // Walk down most loaded, pruning each to the max
     int serversOverloaded = 0;
+    // flag used to fetch regions from head and tail of list, alternately 
+    boolean fetchFromTail = false;
     Map<HServerInfo,BalanceInfo> serverBalanceInfo =
       new TreeMap<HServerInfo,BalanceInfo>();
     for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
@@ -207,54 +250,98 @@ public class LoadBalancer {
         break;
       }
       serversOverloaded++;
-      List<HRegionInfo> regions = randomize(server.getValue());
+      List<HRegionInfo> regions = server.getValue();
       int numToOffload = Math.min(regionCount - max, regions.size());
+      // account for the out-of-band regions which were assigned to this server
+      // after some other region server crashed 
+      Collections.sort(regions, riComparator);
       int numTaken = 0;
-      for (int i = regions.size() - 1; i >= 0; i--) {
-        HRegionInfo hri = regions.get(i);
+      for (int i = 0; i <= numToOffload; ) {
+        HRegionInfo hri = regions.get(i);	// fetch from head
+        if (fetchFromTail) {
+        	hri = regions.get(regions.size() - 1 - i);
+        }
+        i++;
         // Don't rebalance meta regions.
         if (hri.isMetaRegion()) continue;
         regionsToMove.add(new RegionPlan(hri, serverInfo, null));
         numTaken++;
         if (numTaken >= numToOffload) break;
+        // fetch in alternate order if there is new region server
+        if (emptyRegionServerPresent) {
+          fetchFromTail = !fetchFromTail;
+        }
       }
       serverBalanceInfo.put(serverInfo,
           new BalanceInfo(numToOffload, (-1)*numTaken));
     }
-
+    int totalNumMoved = regionsToMove.size();
+    
     // Walk down least loaded, filling each to the min
-    int serversUnderloaded = 0; // number of servers that get new regions
     int neededRegions = 0; // number of regions needed to bring all up to min
+    fetchFromTail = false;
+    RegionPlan rp = null;
+    Map<HServerInfo, Integer> underloadedServers = new HashMap<HServerInfo, Integer>();
     for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
       serversByLoad.entrySet()) {
       int regionCount = server.getKey().getLoad().getNumberOfRegions();
       if(regionCount >= min) {
         break;
       }
-      serversUnderloaded++;
-      int numToTake = min - regionCount;
-      int numTaken = 0;
-      while(numTaken < numToTake && regionidx < regionsToMove.size()) {
-        regionsToMove.get(regionidx).setDestination(server.getKey());
-        numTaken++;
-        regionidx++;
+      underloadedServers.put(server.getKey(), min - regionCount);
+    }
+    // number of servers that get new regions
+    int serversUnderloaded = underloadedServers.size();
+    int incr = 1;
+    List<HServerInfo> serverInfos = Arrays.asList(underloadedServers.keySet().
+        toArray(new HServerInfo[serversUnderloaded]));
+    Collections.shuffle(serverInfos, RANDOM);
+    while (regionsToMove.size() > 0) {
+      int cnt = 0;
+      int i = incr > 0 ? 0 : underloadedServers.size()-1;
+      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
+        if (0 == regionsToMove.size()) break;
+        HServerInfo si = serverInfos.get(i);
+        int numToTake = underloadedServers.get(si);
+        if (numToTake == 0) continue;
+        
+        if (!fetchFromTail) rp = regionsToMove.remove();
+        else rp = regionsToMove.removeLast();
+        rp.setDestination(si);
+        regionsToReturn.add(rp);
+        
+        if (emptyRegionServerPresent) {
+          fetchFromTail = !fetchFromTail;
+        }
+        
+        underloadedServers.put(si, numToTake-1);
+        cnt++;
+        BalanceInfo bi = serverBalanceInfo.get(si);
+        if (bi == null) {
+          bi = new BalanceInfo(0, 0);
+          serverBalanceInfo.put(si, bi);
+        }
+        bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
       }
-      serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
+      if (cnt == 0) break;
+      // iterates underloadedServers in the other direction
+      LOG.info("First pass inner loop assigned " + cnt + " regions");
+      incr = -incr;
+    }
+    for (Integer i : underloadedServers.values()) {
       // If we still want to take some, increment needed
-      if(numTaken < numToTake) {
-        neededRegions += (numToTake - numTaken);
-      }
+        neededRegions += i;
     }
 
     // If none needed to fill all to min and none left to drain all to max,
     // we are done
-    if(neededRegions == 0 && regionidx == regionsToMove.size()) {
+    if(neededRegions == 0 && 0 == regionsToMove.size()) {
       long endTime = System.currentTimeMillis();
       LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
-          "Moving " + regionsToMove.size() + " regions off of " +
+          "Moving " + totalNumMoved + " regions off of " +
           serversOverloaded + " overloaded servers onto " +
           serversUnderloaded + " less loaded servers");
-      return regionsToMove;
+      return regionsToReturn;
     }
 
     // Need to do a second pass.
@@ -272,6 +359,7 @@ public class LoadBalancer {
         HRegionInfo region = server.getValue().get(idx);
         if (region.isMetaRegion()) continue; // Don't move meta regions.
         regionsToMove.add(new RegionPlan(region, server.getKey(), null));
+        totalNumMoved++;
         if(--neededRegions == 0) {
           // No more regions needed, done shedding
           break;
@@ -296,24 +384,35 @@ public class LoadBalancer {
       }
       int numToTake = min - regionCount;
       int numTaken = 0;
-      while(numTaken < numToTake && regionidx < regionsToMove.size()) {
-        regionsToMove.get(regionidx).setDestination(server.getKey());
+      while(numTaken < numToTake && 0 < regionsToMove.size()) {
+        if (!fetchFromTail) rp = regionsToMove.remove();
+        else rp = regionsToMove.removeLast();
+        rp.setDestination(server.getKey());
+        regionsToReturn.add(rp);
+        
         numTaken++;
-        regionidx++;
+        if (emptyRegionServerPresent) {
+          fetchFromTail = !fetchFromTail;
+        }
       }
     }
 
     // If we still have regions to dish out, assign underloaded to max
-    if(regionidx != regionsToMove.size()) {
+    if(0 < regionsToMove.size()) {
       for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
         serversByLoad.entrySet()) {
         int regionCount = server.getKey().getLoad().getNumberOfRegions();
         if(regionCount >= max) {
           break;
         }
-        regionsToMove.get(regionidx).setDestination(server.getKey());
-        regionidx++;
-        if(regionidx == regionsToMove.size()) {
+        if (!fetchFromTail) rp = regionsToMove.remove();
+        else rp = regionsToMove.removeLast();
+        rp.setDestination(server.getKey());
+        regionsToReturn.add(rp);
+        if (emptyRegionServerPresent) {
+          fetchFromTail = !fetchFromTail;
+        }
+        if(0 == regionsToMove.size()) {
           break;
         }
       }
@@ -321,9 +420,9 @@ public class LoadBalancer {
 
     long endTime = System.currentTimeMillis();
 
-    if (regionidx != regionsToMove.size() || neededRegions != 0) {
+    if (0 != regionsToMove.size() || neededRegions != 0) {
       // Emit data so can diagnose how balancer went astray.
-      LOG.warn("regionidx=" + regionidx + ", regionsToMove=" + regionsToMove.size() +
+      LOG.warn("regionsToMove=" + totalNumMoved +
       ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
       ", serversUnderloaded=" + serversUnderloaded);
       StringBuilder sb = new StringBuilder();
@@ -337,12 +436,12 @@ public class LoadBalancer {
     }
 
     // All done!
-    LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
-        "Moving " + regionsToMove.size() + " regions off of " +
+    LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
+        "Moving " + totalNumMoved + " regions off of " +
         serversOverloaded + " overloaded servers onto " +
         serversUnderloaded + " less loaded servers");
 
-    return regionsToMove;
+    return regionsToReturn;
   }
 
   /**
@@ -370,7 +469,7 @@ public class LoadBalancer {
   private static class BalanceInfo {
 
     private final int nextRegionForUnload;
-    private final int numRegionsAdded;
+    private int numRegionsAdded;
 
     public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
       this.nextRegionForUnload = nextRegionForUnload;
@@ -384,6 +483,10 @@ public class LoadBalancer {
     public int getNumRegionsAdded() {
       return numRegionsAdded;
     }
+
+    public void setNumRegionsAdded(int numAdded) {
+      this.numRegionsAdded = numAdded;
+    }
   }
 
   /**

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java Mon Apr 18 18:12:04 2011
@@ -19,7 +19,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -35,8 +34,6 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -141,38 +138,6 @@ public class TestLoadBalancer {
       new int [] { 12, 100 },
   };
 
-  @Test
-  public void testRandomizer() {
-    for(int [] mockCluster : clusterStateMocks) {
-      if (mockCluster.length < 5) continue;
-      Map<HServerInfo, List<HRegionInfo>> servers =
-        mockClusterServers(mockCluster);
-      for (Map.Entry<HServerInfo, List<HRegionInfo>> e: servers.entrySet()) {
-        List<HRegionInfo> original = e.getValue();
-        if (original.size() < 5) continue;
-        // Try ten times in case random chances upon original order more than
-        // one or two times in a row.
-        boolean same = true;
-        for (int i = 0; i < 10 && same; i++) {
-          List<HRegionInfo> copy = new ArrayList<HRegionInfo>(original);
-          System.out.println("Randomizing before " + copy.size());
-          for (HRegionInfo hri: copy) {
-            System.out.println(hri.getEncodedName());
-          }
-          List<HRegionInfo> randomized = LoadBalancer.randomize(copy);
-          System.out.println("Randomizing after " + randomized.size());
-          for (HRegionInfo hri: randomized) {
-            System.out.println(hri.getEncodedName());
-          }
-          if (original.equals(randomized)) continue;
-          same = false;
-          break;
-        }
-        assertFalse(same);
-      }
-    }
-  }
-
   /**
    * Test the load balancing algorithm.
    *
@@ -434,6 +399,7 @@ public class TestLoadBalancer {
   }
 
   private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
+  static int regionId = 0;
 
   private List<HRegionInfo> randomRegions(int numRegions) {
     List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
@@ -449,7 +415,8 @@ public class TestLoadBalancer {
       Bytes.putInt(start, 0, numRegions << 1);
       Bytes.putInt(end, 0, (numRegions << 1) + 1);
       HRegionInfo hri = new HRegionInfo(
-          new HTableDescriptor(Bytes.toBytes("table" + i)), start, end);
+          new HTableDescriptor(Bytes.toBytes("table" + i)), start, end,
+          false, regionId++);
       regions.add(hri);
     }
     return regions;