You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/18 20:12:05 UTC
svn commit: r1094679 - in /hbase/trunk: CHANGES.txt pom.xml
src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
Author: stack
Date: Mon Apr 18 18:12:04 2011
New Revision: 1094679
URL: http://svn.apache.org/viewvc?rev=1094679&view=rev
Log:
HBASE-3609 Improve the selection of regions to balance; part 2
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/pom.xml
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Apr 18 18:12:04 2011
@@ -172,6 +172,7 @@ Release 0.91.0 - Unreleased
HBASE-3699 Make RegionServerServices and MasterServices extend Server
(Erik Onnen)
HBASE-3757 Upgrade to ZK 3.3.3
+ HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Mon Apr 18 18:12:04 2011
@@ -525,7 +525,7 @@
<commons-lang.version>2.5</commons-lang.version>
<commons-logging.version>1.1.1</commons-logging.version>
<commons-math.version>2.1</commons-math.version>
- <guava.version>r06</guava.version>
+ <guava.version>r09</guava.version>
<!--The below was made by patching branch-0.20-append
at revision 1034499 with this hdfs-895 patch:
https://issues.apache.org/jira/secure/attachment/12459473/hdfs-895-branch-20-append.txt
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Mon Apr 18 18:12:04 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
+import com.google.common.collect.MinMaxPriorityQueue;
+
/**
* Makes decisions about the placement and movement of Regions across
* RegionServers.
@@ -69,7 +73,26 @@ public class LoadBalancer {
else if (slop > 1) slop = 1;
}
- static class RegionPlanComparator implements Comparator<RegionPlan> {
+ /*
+ * The following comparator assumes that RegionId from HRegionInfo can
+ * represent the age of the region - larger RegionId means the region
+ * is younger.
+ * This comparator is used in balanceCluster() to account for the out-of-band
+ * regions which were assigned to the server after some other region server
+ * crashed.
+ */
+ static class RegionInfoComparator implements Comparator<HRegionInfo> {
+ @Override
+ public int compare(HRegionInfo l, HRegionInfo r) {
+ long diff = r.getRegionId() - l.getRegionId();
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+ }
+ static RegionInfoComparator riComparator = new RegionInfoComparator();
+
+ static class RegionPlanComparator implements Comparator<RegionPlan> {
@Override
public int compare(RegionPlan l, RegionPlan r) {
long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
@@ -89,6 +112,22 @@ public class LoadBalancer {
* all servers will be balanced to the average. Otherwise, all servers will
* have either floor(average) or ceiling(average) regions.
*
+ * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
+ * we can fetch from both ends of the queue.
+ * At the beginning, we check whether there was empty region server
+ * just discovered by Master. If so, we alternately choose new / old
+ * regions from head / tail of regionsToMove, respectively. This alternation
+ * avoids clustering young regions on the newly discovered region server.
+ * Otherwise, we choose new regions from head of regionsToMove.
+ *
+ * Another improvement from HBASE-3609 is that we assign regions from
+ * regionsToMove to underloaded servers in round-robin fashion.
+ * Previously one underloaded server would be filled before we move onto
+ * the next underloaded server, leading to clustering of young regions.
+ *
+ * Finally, we randomly shuffle underloaded servers so that they receive
+ * offloaded regions relatively evenly across calls to balanceCluster().
+ *
* The algorithm is currently implemented as such:
*
* <ol>
@@ -110,7 +149,7 @@ public class LoadBalancer {
* regions shed to fill each underloaded server to <b>MIN</b>. If so we
* end up with a number of regions required to do so, <b>neededRegions</b>.
*
- * It is also possible that we were able fill each underloaded but ended
+ * It is also possible that we were able to fill each underloaded but ended
* up with regions that were unassigned from overloaded servers but that
* still do not have assignment.
*
@@ -125,7 +164,6 @@ public class LoadBalancer {
*
* <li>We now definitely have more regions that need assignment, either from
* the previous step or from the original shedding from overloaded servers.
- *
* Iterate the least loaded servers filling each to <b>MIN</b>.
*
* <li>If we still have more regions that need assignment, again iterate the
@@ -152,6 +190,7 @@ public class LoadBalancer {
*/
public List<RegionPlan> balanceCluster(
Map<HServerInfo,List<HRegionInfo>> clusterState) {
+ boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
// Make a map sorted by load and count regions
@@ -167,7 +206,9 @@ public class LoadBalancer {
// Iterate so we can count regions as we build the map
for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
clusterState.entrySet()) {
- server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
+ int sz = server.getValue().size();
+ if (sz == 0) emptyRegionServerPresent = true;
+ server.getKey().getLoad().setNumberOfRegions(sz);
numRegions += server.getKey().getLoad().getNumberOfRegions();
serversByLoad.put(server.getKey(), server.getValue());
}
@@ -191,11 +232,13 @@ public class LoadBalancer {
// Balance the cluster
// TODO: Look at data block locality or a more complex load to do this
- List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
- int regionidx = 0; // track the index in above list for setting destination
+ MinMaxPriorityQueue<RegionPlan> regionsToMove = MinMaxPriorityQueue.orderedBy(rpComparator).create();
+ List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
// Walk down most loaded, pruning each to the max
int serversOverloaded = 0;
+ // flag used to fetch regions from head and tail of list, alternately
+ boolean fetchFromTail = false;
Map<HServerInfo,BalanceInfo> serverBalanceInfo =
new TreeMap<HServerInfo,BalanceInfo>();
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
@@ -207,54 +250,98 @@ public class LoadBalancer {
break;
}
serversOverloaded++;
- List<HRegionInfo> regions = randomize(server.getValue());
+ List<HRegionInfo> regions = server.getValue();
int numToOffload = Math.min(regionCount - max, regions.size());
+ // account for the out-of-band regions which were assigned to this server
+ // after some other region server crashed
+ Collections.sort(regions, riComparator);
int numTaken = 0;
- for (int i = regions.size() - 1; i >= 0; i--) {
- HRegionInfo hri = regions.get(i);
+ for (int i = 0; i <= numToOffload; ) {
+ HRegionInfo hri = regions.get(i); // fetch from head
+ if (fetchFromTail) {
+ hri = regions.get(regions.size() - 1 - i);
+ }
+ i++;
// Don't rebalance meta regions.
if (hri.isMetaRegion()) continue;
regionsToMove.add(new RegionPlan(hri, serverInfo, null));
numTaken++;
if (numTaken >= numToOffload) break;
+ // fetch in alternate order if there is new region server
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
}
serverBalanceInfo.put(serverInfo,
new BalanceInfo(numToOffload, (-1)*numTaken));
}
-
+ int totalNumMoved = regionsToMove.size();
+
// Walk down least loaded, filling each to the min
- int serversUnderloaded = 0; // number of servers that get new regions
int neededRegions = 0; // number of regions needed to bring all up to min
+ fetchFromTail = false;
+ RegionPlan rp = null;
+ Map<HServerInfo, Integer> underloadedServers = new HashMap<HServerInfo, Integer>();
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
serversByLoad.entrySet()) {
int regionCount = server.getKey().getLoad().getNumberOfRegions();
if(regionCount >= min) {
break;
}
- serversUnderloaded++;
- int numToTake = min - regionCount;
- int numTaken = 0;
- while(numTaken < numToTake && regionidx < regionsToMove.size()) {
- regionsToMove.get(regionidx).setDestination(server.getKey());
- numTaken++;
- regionidx++;
+ underloadedServers.put(server.getKey(), min - regionCount);
+ }
+ // number of servers that get new regions
+ int serversUnderloaded = underloadedServers.size();
+ int incr = 1;
+ List<HServerInfo> serverInfos = Arrays.asList(underloadedServers.keySet().
+ toArray(new HServerInfo[serversUnderloaded]));
+ Collections.shuffle(serverInfos, RANDOM);
+ while (regionsToMove.size() > 0) {
+ int cnt = 0;
+ int i = incr > 0 ? 0 : underloadedServers.size()-1;
+ for (; i >= 0 && i < underloadedServers.size(); i += incr) {
+ if (0 == regionsToMove.size()) break;
+ HServerInfo si = serverInfos.get(i);
+ int numToTake = underloadedServers.get(si);
+ if (numToTake == 0) continue;
+
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(si);
+ regionsToReturn.add(rp);
+
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+
+ underloadedServers.put(si, numToTake-1);
+ cnt++;
+ BalanceInfo bi = serverBalanceInfo.get(si);
+ if (bi == null) {
+ bi = new BalanceInfo(0, 0);
+ serverBalanceInfo.put(si, bi);
+ }
+ bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
}
- serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
+ if (cnt == 0) break;
+ // iterates underloadedServers in the other direction
+ LOG.info("First pass inner loop assigned " + cnt + " regions");
+ incr = -incr;
+ }
+ for (Integer i : underloadedServers.values()) {
// If we still want to take some, increment needed
- if(numTaken < numToTake) {
- neededRegions += (numToTake - numTaken);
- }
+ neededRegions += i;
}
// If none needed to fill all to min and none left to drain all to max,
// we are done
- if(neededRegions == 0 && regionidx == regionsToMove.size()) {
+ if(neededRegions == 0 && 0 == regionsToMove.size()) {
long endTime = System.currentTimeMillis();
LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + regionsToMove.size() + " regions off of " +
+ "Moving " + totalNumMoved + " regions off of " +
serversOverloaded + " overloaded servers onto " +
serversUnderloaded + " less loaded servers");
- return regionsToMove;
+ return regionsToReturn;
}
// Need to do a second pass.
@@ -272,6 +359,7 @@ public class LoadBalancer {
HRegionInfo region = server.getValue().get(idx);
if (region.isMetaRegion()) continue; // Don't move meta regions.
regionsToMove.add(new RegionPlan(region, server.getKey(), null));
+ totalNumMoved++;
if(--neededRegions == 0) {
// No more regions needed, done shedding
break;
@@ -296,24 +384,35 @@ public class LoadBalancer {
}
int numToTake = min - regionCount;
int numTaken = 0;
- while(numTaken < numToTake && regionidx < regionsToMove.size()) {
- regionsToMove.get(regionidx).setDestination(server.getKey());
+ while(numTaken < numToTake && 0 < regionsToMove.size()) {
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(server.getKey());
+ regionsToReturn.add(rp);
+
numTaken++;
- regionidx++;
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
}
}
// If we still have regions to dish out, assign underloaded to max
- if(regionidx != regionsToMove.size()) {
+ if(0 < regionsToMove.size()) {
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
serversByLoad.entrySet()) {
int regionCount = server.getKey().getLoad().getNumberOfRegions();
if(regionCount >= max) {
break;
}
- regionsToMove.get(regionidx).setDestination(server.getKey());
- regionidx++;
- if(regionidx == regionsToMove.size()) {
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(server.getKey());
+ regionsToReturn.add(rp);
+ if (emptyRegionServerPresent) {
+ fetchFromTail = !fetchFromTail;
+ }
+ if(0 == regionsToMove.size()) {
break;
}
}
@@ -321,9 +420,9 @@ public class LoadBalancer {
long endTime = System.currentTimeMillis();
- if (regionidx != regionsToMove.size() || neededRegions != 0) {
+ if (0 != regionsToMove.size() || neededRegions != 0) {
// Emit data so can diagnose how balancer went astray.
- LOG.warn("regionidx=" + regionidx + ", regionsToMove=" + regionsToMove.size() +
+ LOG.warn("regionsToMove=" + totalNumMoved +
", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
@@ -337,12 +436,12 @@ public class LoadBalancer {
}
// All done!
- LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
- "Moving " + regionsToMove.size() + " regions off of " +
+ LOG.info("Done. Calculated a load balance in " + (endTime-startTime) + "ms. " +
+ "Moving " + totalNumMoved + " regions off of " +
serversOverloaded + " overloaded servers onto " +
serversUnderloaded + " less loaded servers");
- return regionsToMove;
+ return regionsToReturn;
}
/**
@@ -370,7 +469,7 @@ public class LoadBalancer {
private static class BalanceInfo {
private final int nextRegionForUnload;
- private final int numRegionsAdded;
+ private int numRegionsAdded;
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
this.nextRegionForUnload = nextRegionForUnload;
@@ -384,6 +483,10 @@ public class LoadBalancer {
public int getNumRegionsAdded() {
return numRegionsAdded;
}
+
+ public void setNumRegionsAdded(int numAdded) {
+ this.numRegionsAdded = numAdded;
+ }
}
/**
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java?rev=1094679&r1=1094678&r2=1094679&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java Mon Apr 18 18:12:04 2011
@@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -35,8 +34,6 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
-import junit.framework.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -141,38 +138,6 @@ public class TestLoadBalancer {
new int [] { 12, 100 },
};
- @Test
- public void testRandomizer() {
- for(int [] mockCluster : clusterStateMocks) {
- if (mockCluster.length < 5) continue;
- Map<HServerInfo, List<HRegionInfo>> servers =
- mockClusterServers(mockCluster);
- for (Map.Entry<HServerInfo, List<HRegionInfo>> e: servers.entrySet()) {
- List<HRegionInfo> original = e.getValue();
- if (original.size() < 5) continue;
- // Try ten times in case random chances upon original order more than
- // one or two times in a row.
- boolean same = true;
- for (int i = 0; i < 10 && same; i++) {
- List<HRegionInfo> copy = new ArrayList<HRegionInfo>(original);
- System.out.println("Randomizing before " + copy.size());
- for (HRegionInfo hri: copy) {
- System.out.println(hri.getEncodedName());
- }
- List<HRegionInfo> randomized = LoadBalancer.randomize(copy);
- System.out.println("Randomizing after " + randomized.size());
- for (HRegionInfo hri: randomized) {
- System.out.println(hri.getEncodedName());
- }
- if (original.equals(randomized)) continue;
- same = false;
- break;
- }
- assertFalse(same);
- }
- }
- }
-
/**
* Test the load balancing algorithm.
*
@@ -434,6 +399,7 @@ public class TestLoadBalancer {
}
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
+ static int regionId = 0;
private List<HRegionInfo> randomRegions(int numRegions) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
@@ -449,7 +415,8 @@ public class TestLoadBalancer {
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
HRegionInfo hri = new HRegionInfo(
- new HTableDescriptor(Bytes.toBytes("table" + i)), start, end);
+ new HTableDescriptor(Bytes.toBytes("table" + i)), start, end,
+ false, regionId++);
regions.add(hri);
}
return regions;