You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/28 01:12:44 UTC
svn commit: r1097275 [5/8] - in /hbase/trunk: ./ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/avro/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/ src/main/java/o...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java Wed Apr 27 23:12:42 2011
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeMap;
@@ -40,8 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.ServerName;
import com.google.common.collect.MinMaxPriorityQueue;
@@ -104,6 +104,27 @@ public class LoadBalancer {
static RegionPlanComparator rpComparator = new RegionPlanComparator();
/**
+ * Data structure that holds servername and 'load'.
+ */
+ static class ServerAndLoad implements Comparable<ServerAndLoad> {
+ private final ServerName sn;
+ private final int load;
+ ServerAndLoad(final ServerName sn, final int load) {
+ this.sn = sn;
+ this.load = load;
+ }
+
+ ServerName getServerName() {return this.sn;}
+ int getLoad() {return this.load;}
+
+ @Override
+ public int compareTo(ServerAndLoad other) {
+ int diff = this.load - other.load;
+ return diff != 0? diff: this.sn.compareTo(other.getServerName());
+ }
+ }
+
+ /**
* Generate a global load balancing plan according to the specified map of
* server information to the most loaded regions of each server.
*
@@ -189,28 +210,25 @@ public class LoadBalancer {
* or null if cluster is already balanced
*/
public List<RegionPlan> balanceCluster(
- Map<HServerInfo,List<HRegionInfo>> clusterState) {
+ Map<ServerName, List<HRegionInfo>> clusterState) {
boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
- // Make a map sorted by load and count regions
- TreeMap<HServerInfo,List<HRegionInfo>> serversByLoad =
- new TreeMap<HServerInfo,List<HRegionInfo>>(
- new HServerInfo.LoadComparator());
int numServers = clusterState.size();
if (numServers == 0) {
LOG.debug("numServers=0 so skipping load balancing");
return null;
}
+ NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad =
+ new TreeMap<ServerAndLoad, List<HRegionInfo>>();
int numRegions = 0;
// Iterate so we can count regions as we build the map
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
- clusterState.entrySet()) {
- int sz = server.getValue().size();
+ for (Map.Entry<ServerName, List<HRegionInfo>> server: clusterState.entrySet()) {
+ List<HRegionInfo> regions = server.getValue();
+ int sz = regions.size();
if (sz == 0) emptyRegionServerPresent = true;
- server.getKey().getLoad().setNumberOfRegions(sz);
- numRegions += server.getKey().getLoad().getNumberOfRegions();
- serversByLoad.put(server.getKey(), server.getValue());
+ numRegions += sz;
+ serversByLoad.put(new ServerAndLoad(server.getKey(), sz), regions);
}
// Check if we even need to do any load balancing
@@ -218,13 +236,14 @@ public class LoadBalancer {
// HBASE-3681 check sloppiness first
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
- if(serversByLoad.lastKey().getLoad().getNumberOfRegions() <= ceiling &&
- serversByLoad.firstKey().getLoad().getNumberOfRegions() >= floor) {
+ if (serversByLoad.lastKey().getLoad() <= ceiling &&
+ serversByLoad.firstKey().getLoad() >= floor) {
// Skipped because no server outside (min,max) range
- LOG.info("Skipping load balancing. servers=" + numServers + " " +
- "regions=" + numRegions + " average=" + average + " " +
- "mostloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions() +
- " leastloaded=" + serversByLoad.firstKey().getLoad().getNumberOfRegions());
+ LOG.info("Skipping load balancing because balanced cluster; " +
+ "servers=" + numServers + " " +
+ "regions=" + numRegions + " average=" + average + " " +
+ "mostloaded=" + serversByLoad.lastKey().getLoad() +
+ " leastloaded=" + serversByLoad.lastKey().getLoad());
return null;
}
int min = numRegions / numServers;
@@ -232,21 +251,22 @@ public class LoadBalancer {
// Balance the cluster
// TODO: Look at data block locality or a more complex load to do this
- MinMaxPriorityQueue<RegionPlan> regionsToMove = MinMaxPriorityQueue.orderedBy(rpComparator).create();
+ MinMaxPriorityQueue<RegionPlan> regionsToMove =
+ MinMaxPriorityQueue.orderedBy(rpComparator).create();
List<RegionPlan> regionsToReturn = new ArrayList<RegionPlan>();
// Walk down most loaded, pruning each to the max
int serversOverloaded = 0;
- // flag used to fetch regions from head and tail of list, alternately
+ // flag used to fetch regions from head and tail of list, alternately
boolean fetchFromTail = false;
- Map<HServerInfo,BalanceInfo> serverBalanceInfo =
- new TreeMap<HServerInfo,BalanceInfo>();
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
- serversByLoad.descendingMap().entrySet()) {
- HServerInfo serverInfo = server.getKey();
- int regionCount = serverInfo.getLoad().getNumberOfRegions();
- if(regionCount <= max) {
- serverBalanceInfo.put(serverInfo, new BalanceInfo(0, 0));
+ Map<ServerName, BalanceInfo> serverBalanceInfo =
+ new TreeMap<ServerName, BalanceInfo>();
+ for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
+ serversByLoad.descendingMap().entrySet()) {
+ ServerAndLoad sal = server.getKey();
+ int regionCount = sal.getLoad();
+ if (regionCount <= max) {
+ serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
break;
}
serversOverloaded++;
@@ -257,14 +277,14 @@ public class LoadBalancer {
Collections.sort(regions, riComparator);
int numTaken = 0;
for (int i = 0; i <= numToOffload; ) {
- HRegionInfo hri = regions.get(i); // fetch from head
+ HRegionInfo hri = regions.get(i); // fetch from head
if (fetchFromTail) {
- hri = regions.get(regions.size() - 1 - i);
+ hri = regions.get(regions.size() - 1 - i);
}
i++;
// Don't rebalance meta regions.
if (hri.isMetaRegion()) continue;
- regionsToMove.add(new RegionPlan(hri, serverInfo, null));
+ regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
numTaken++;
if (numTaken >= numToOffload) break;
// fetch in alternate order if there is new region server
@@ -272,48 +292,44 @@ public class LoadBalancer {
fetchFromTail = !fetchFromTail;
}
}
- serverBalanceInfo.put(serverInfo,
- new BalanceInfo(numToOffload, (-1)*numTaken));
+ serverBalanceInfo.put(sal.getServerName(),
+ new BalanceInfo(numToOffload, (-1)*numTaken));
}
int totalNumMoved = regionsToMove.size();
-
+
// Walk down least loaded, filling each to the min
int neededRegions = 0; // number of regions needed to bring all up to min
fetchFromTail = false;
- RegionPlan rp = null;
- Map<HServerInfo, Integer> underloadedServers = new HashMap<HServerInfo, Integer>();
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad().getNumberOfRegions();
- if(regionCount >= min) {
+
+ Map<ServerName, Integer> underloadedServers = new HashMap<ServerName, Integer>();
+ for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server:
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
+ if (regionCount >= min) {
break;
}
- underloadedServers.put(server.getKey(), min - regionCount);
+ underloadedServers.put(server.getKey().getServerName(), min - regionCount);
}
// number of servers that get new regions
int serversUnderloaded = underloadedServers.size();
int incr = 1;
- List<HServerInfo> serverInfos = Arrays.asList(underloadedServers.keySet().
- toArray(new HServerInfo[serversUnderloaded]));
- Collections.shuffle(serverInfos, RANDOM);
+ List<ServerName> sns =
+ Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
+ Collections.shuffle(sns, RANDOM);
while (regionsToMove.size() > 0) {
int cnt = 0;
int i = incr > 0 ? 0 : underloadedServers.size()-1;
for (; i >= 0 && i < underloadedServers.size(); i += incr) {
- if (0 == regionsToMove.size()) break;
- HServerInfo si = serverInfos.get(i);
+ if (regionsToMove.isEmpty()) break;
+ ServerName si = sns.get(i);
int numToTake = underloadedServers.get(si);
if (numToTake == 0) continue;
-
- if (!fetchFromTail) rp = regionsToMove.remove();
- else rp = regionsToMove.removeLast();
- rp.setDestination(si);
- regionsToReturn.add(rp);
-
+
+ addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
-
+
underloadedServers.put(si, numToTake-1);
cnt++;
BalanceInfo bi = serverBalanceInfo.get(si);
@@ -325,17 +341,16 @@ public class LoadBalancer {
}
if (cnt == 0) break;
// iterates underloadedServers in the other direction
- LOG.info("First pass inner loop assigned " + cnt + " regions");
incr = -incr;
}
for (Integer i : underloadedServers.values()) {
// If we still want to take some, increment needed
- neededRegions += i;
+ neededRegions += i;
}
// If none needed to fill all to min and none left to drain all to max,
// we are done
- if(neededRegions == 0 && 0 == regionsToMove.size()) {
+ if (neededRegions == 0 && regionsToMove.isEmpty()) {
long endTime = System.currentTimeMillis();
LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
"Moving " + totalNumMoved + " regions off of " +
@@ -350,17 +365,18 @@ public class LoadBalancer {
// If we need more to fill min, grab one from each most loaded until enough
if (neededRegions != 0) {
// Walk down most loaded, grabbing one from each until we get enough
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
serversByLoad.descendingMap().entrySet()) {
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+ BalanceInfo balanceInfo =
+ serverBalanceInfo.get(server.getKey().getServerName());
int idx =
balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
if (idx >= server.getValue().size()) break;
HRegionInfo region = server.getValue().get(idx);
if (region.isMetaRegion()) continue; // Don't move meta regions.
- regionsToMove.add(new RegionPlan(region, server.getKey(), null));
+ regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
totalNumMoved++;
- if(--neededRegions == 0) {
+ if (--neededRegions == 0) {
// No more regions needed, done shedding
break;
}
@@ -371,11 +387,11 @@ public class LoadBalancer {
// Assign each underloaded up to the min, then if leftovers, assign to max
// Walk down least loaded, assigning to each to fill up to min
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
- serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad().getNumberOfRegions();
+ for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
+ serversByLoad.entrySet()) {
+ int regionCount = server.getKey().getLoad();
if (regionCount >= min) break;
- BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
+ BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
if(balanceInfo != null) {
regionCount += balanceInfo.getNumRegionsAdded();
}
@@ -385,11 +401,8 @@ public class LoadBalancer {
int numToTake = min - regionCount;
int numTaken = 0;
while(numTaken < numToTake && 0 < regionsToMove.size()) {
- if (!fetchFromTail) rp = regionsToMove.remove();
- else rp = regionsToMove.removeLast();
- rp.setDestination(server.getKey());
- regionsToReturn.add(rp);
-
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
numTaken++;
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
@@ -398,21 +411,19 @@ public class LoadBalancer {
}
// If we still have regions to dish out, assign underloaded to max
- if(0 < regionsToMove.size()) {
- for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
+ if (0 < regionsToMove.size()) {
+ for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
serversByLoad.entrySet()) {
- int regionCount = server.getKey().getLoad().getNumberOfRegions();
+ int regionCount = server.getKey().getLoad();
if(regionCount >= max) {
break;
}
- if (!fetchFromTail) rp = regionsToMove.remove();
- else rp = regionsToMove.removeLast();
- rp.setDestination(server.getKey());
- regionsToReturn.add(rp);
+ addRegionPlan(regionsToMove, fetchFromTail,
+ server.getKey().getServerName(), regionsToReturn);
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
- if(0 == regionsToMove.size()) {
+ if (regionsToMove.isEmpty()) {
break;
}
}
@@ -420,15 +431,15 @@ public class LoadBalancer {
long endTime = System.currentTimeMillis();
- if (0 != regionsToMove.size() || neededRegions != 0) {
+ if (!regionsToMove.isEmpty() || neededRegions != 0) {
// Emit data so can diagnose how balancer went astray.
LOG.warn("regionsToMove=" + totalNumMoved +
- ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
- ", serversUnderloaded=" + serversUnderloaded);
+ ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
+ ", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
- for (Map.Entry<HServerInfo, List<HRegionInfo>> e: clusterState.entrySet()) {
+ for (Map.Entry<ServerName, List<HRegionInfo>> e: clusterState.entrySet()) {
if (sb.length() > 0) sb.append(", ");
- sb.append(e.getKey().getServerName());
+ sb.append(e.getKey().toString());
sb.append(" ");
sb.append(e.getValue().size());
}
@@ -445,6 +456,18 @@ public class LoadBalancer {
}
/**
+ * Add a region from the head or tail to the List of regions to return.
+ */
+ void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
+ final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
+ RegionPlan rp = null;
+ if (!fetchFromTail) rp = regionsToMove.remove();
+ else rp = regionsToMove.removeLast();
+ rp.setDestination(sn);
+ regionsToReturn.add(rp);
+ }
+
+ /**
* @param regions
* @return Randomization of passed <code>regions</code>
*/
@@ -457,11 +480,6 @@ public class LoadBalancer {
* Stores additional per-server information about the regions added/removed
* during the run of the balancing algorithm.
*
- * For servers that receive additional regions, we are not updating the number
- * of regions in HServerInfo once we decide to reassign regions to a server,
- * but we need this information later in the algorithm. This is stored in
- * <b>numRegionsAdded</b>.
- *
* For servers that shed regions, we need to track which regions we have
* already shed. <b>nextRegionForUnload</b> contains the index in the list
* of regions on the server that is the next to be shed.
@@ -506,14 +524,14 @@ public class LoadBalancer {
* @return map of server to the regions it should take, or null if no
* assignment is possible (ie. no regions or no servers)
*/
- public static Map<HServerInfo, List<HRegionInfo>> roundRobinAssignment(
- HRegionInfo [] regions, List<HServerInfo> servers) {
- if(regions.length == 0 || servers.size() == 0) {
+ public static Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
+ List<HRegionInfo> regions, List<ServerName> servers) {
+ if (regions.isEmpty() || servers.isEmpty()) {
return null;
}
- Map<HServerInfo,List<HRegionInfo>> assignments =
- new TreeMap<HServerInfo,List<HRegionInfo>>();
- int numRegions = regions.length;
+ Map<ServerName, List<HRegionInfo>> assignments =
+ new TreeMap<ServerName,List<HRegionInfo>>();
+ int numRegions = regions.size();
int numServers = servers.size();
int max = (int)Math.ceil((float)numRegions/numServers);
int serverIdx = 0;
@@ -522,10 +540,10 @@ public class LoadBalancer {
}
int regionIdx = 0;
for (int j = 0; j < numServers; j++) {
- HServerInfo server = servers.get((j+serverIdx) % numServers);
+ ServerName server = servers.get((j + serverIdx) % numServers);
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
for (int i=regionIdx; i<numRegions; i += numServers) {
- serverRegions.add(regions[i % numRegions]);
+ serverRegions.add(regions.get(i % numRegions));
}
assignments.put(server, serverRegions);
regionIdx++;
@@ -549,25 +567,20 @@ public class LoadBalancer {
* @param servers available servers
* @return map of servers and regions to be assigned to them
*/
- public static Map<HServerInfo, List<HRegionInfo>> retainAssignment(
- Map<HRegionInfo, HServerAddress> regions, List<HServerInfo> servers) {
- Map<HServerInfo, List<HRegionInfo>> assignments =
- new TreeMap<HServerInfo, List<HRegionInfo>>();
- // Build a map of server addresses to server info so we can match things up
- Map<HServerAddress, HServerInfo> serverMap =
- new TreeMap<HServerAddress, HServerInfo>();
- for (HServerInfo server : servers) {
- serverMap.put(server.getServerAddress(), server);
+ public static Map<ServerName, List<HRegionInfo>> retainAssignment(
+ Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
+ Map<ServerName, List<HRegionInfo>> assignments =
+ new TreeMap<ServerName, List<HRegionInfo>>();
+ for (ServerName server : servers) {
assignments.put(server, new ArrayList<HRegionInfo>());
}
- for (Map.Entry<HRegionInfo, HServerAddress> region : regions.entrySet()) {
- HServerAddress hsa = region.getValue();
- HServerInfo server = hsa == null? null: serverMap.get(hsa);
- if (server != null) {
- assignments.get(server).add(region.getKey());
+ for (Map.Entry<HRegionInfo, ServerName> region : regions.entrySet()) {
+ ServerName sn = region.getValue();
+ if (sn != null && servers.contains(sn)) {
+ assignments.get(sn).add(region.getKey());
} else {
- assignments.get(servers.get(RANDOM.nextInt(assignments.size()))).add(
- region.getKey());
+ int size = assignments.size();
+ assignments.get(servers.get(RANDOM.nextInt(size))).add(region.getKey());
}
}
return assignments;
@@ -692,17 +705,17 @@ public class LoadBalancer {
* @param servers
* @return map of regions to the server it should be assigned to
*/
- public static Map<HRegionInfo,HServerInfo> immediateAssignment(
- List<HRegionInfo> regions, List<HServerInfo> servers) {
- Map<HRegionInfo,HServerInfo> assignments =
- new TreeMap<HRegionInfo,HServerInfo>();
+ public static Map<HRegionInfo, ServerName> immediateAssignment(
+ List<HRegionInfo> regions, List<ServerName> servers) {
+ Map<HRegionInfo,ServerName> assignments =
+ new TreeMap<HRegionInfo,ServerName>();
for(HRegionInfo region : regions) {
assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
}
return assignments;
}
- public static HServerInfo randomAssignment(List<HServerInfo> servers) {
+ public static ServerName randomAssignment(List<ServerName> servers) {
if (servers == null || servers.isEmpty()) {
LOG.warn("Wanted to do random assignment but no servers to assign to");
return null;
@@ -722,21 +735,21 @@ public class LoadBalancer {
*/
public static class RegionPlan implements Comparable<RegionPlan> {
private final HRegionInfo hri;
- private final HServerInfo source;
- private HServerInfo dest;
+ private final ServerName source;
+ private ServerName dest;
/**
* Instantiate a plan for a region move, moving the specified region from
* the specified source server to the specified destination server.
*
* Destination server can be instantiated as null and later set
- * with {@link #setDestination(HServerInfo)}.
+ * with {@link #setDestination(ServerName)}.
*
* @param hri region to be moved
* @param source regionserver region should be moved from
* @param dest regionserver region should be moved to
*/
- public RegionPlan(final HRegionInfo hri, HServerInfo source, HServerInfo dest) {
+ public RegionPlan(final HRegionInfo hri, ServerName source, ServerName dest) {
this.hri = hri;
this.source = source;
this.dest = dest;
@@ -745,7 +758,7 @@ public class LoadBalancer {
/**
* Set the destination server for the plan for this region.
*/
- public void setDestination(HServerInfo dest) {
+ public void setDestination(ServerName dest) {
this.dest = dest;
}
@@ -753,7 +766,7 @@ public class LoadBalancer {
* Get the source server for the plan for this region.
* @return server info for source
*/
- public HServerInfo getSource() {
+ public ServerName getSource() {
return source;
}
@@ -761,7 +774,7 @@ public class LoadBalancer {
* Get the destination server for the plan for this region.
* @return server info for destination
*/
- public HServerInfo getDestination() {
+ public ServerName getDestination() {
return dest;
}
@@ -789,8 +802,8 @@ public class LoadBalancer {
@Override
public String toString() {
return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
- (this.source == null? "": this.source.getServerName()) +
- ", dest=" + (this.dest == null? "": this.dest.getServerName());
+ (this.source == null? "": this.source.toString()) +
+ ", dest=" + (this.dest == null? "": this.dest.toString());
}
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Wed Apr 27 23:12:42 2011
@@ -287,7 +287,7 @@ public class MasterCoprocessorHost
}
}
- void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+ void preMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer)
throws UnknownRegionException {
ObserverContext<MasterCoprocessorEnvironment> ctx = null;
for (MasterEnvironment env: coprocessors) {
@@ -302,7 +302,7 @@ public class MasterCoprocessorHost
}
}
- void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
+ void postMove(final HRegionInfo region, final ServerName srcServer, final ServerName destServer)
throws UnknownRegionException {
ObserverContext<MasterCoprocessorEnvironment> ctx = null;
for (MasterEnvironment env: coprocessors) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Wed Apr 27 23:12:42 2011
@@ -20,7 +20,7 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
-import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,9 +34,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -95,7 +95,7 @@ public class MasterFileSystem {
conf.getBoolean("hbase.master.distributed.log.splitting", true);
if (this.distributedLogSplitting) {
this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
- master.getConfiguration(), master, master.getServerName());
+ master.getConfiguration(), master, master.getServerName().toString());
this.splitLogManager.finishInitialization();
} else {
this.splitLogManager = null;
@@ -175,9 +175,9 @@ public class MasterFileSystem {
* Inspect the log directory to recover any log file without
* an active region server.
* @param onlineServers Map of online servers keyed by
- * {@link HServerInfo#getServerName()}
+ * {@link ServerName}
*/
- void splitLogAfterStartup(final Map<String, HServerInfo> onlineServers) {
+ void splitLogAfterStartup(final Set<ServerName> onlineServers) {
Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
try {
if (!this.fs.exists(logsDirPath)) {
@@ -197,8 +197,8 @@ public class MasterFileSystem {
return;
}
for (FileStatus status : logFolders) {
- String serverName = status.getPath().getName();
- if (onlineServers.get(serverName) == null) {
+ ServerName serverName = new ServerName(status.getPath().getName());
+ if (!onlineServers.contains(serverName)) {
LOG.info("Log folder " + status.getPath() + " doesn't belong " +
"to a known region server, splitting");
splitLog(serverName);
@@ -209,9 +209,9 @@ public class MasterFileSystem {
}
}
- public void splitLog(final String serverName) {
+ public void splitLog(final ServerName serverName) {
long splitTime = 0, splitLogSize = 0;
- Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
+ Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName.toString()));
if (distributedLogSplitting) {
splitTime = EnvironmentEdgeManager.currentTimeMillis();
try {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Apr 27 23:12:42 2011
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -32,13 +33,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
-import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HConnection;
@@ -47,12 +47,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
-import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
-import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
/**
- * The ServerManager class manages info about region servers - HServerInfo,
- * load numbers, dying servers, etc.
+ * The ServerManager class manages info about region servers.
* <p>
* Maintains lists of online and dead servers. Processes the startups,
* shutdowns, and deaths of region servers.
@@ -70,23 +67,20 @@ public class ServerManager {
// Set if we are to shutdown the cluster.
private volatile boolean clusterShutdown = false;
- /** The map of known server names to server info */
- private final Map<String, HServerInfo> onlineServers =
- new ConcurrentHashMap<String, HServerInfo>();
+ /** Map of registered servers to their current load */
+ private final Map<ServerName, HServerLoad> onlineServers =
+ new ConcurrentHashMap<ServerName, HServerLoad>();
// TODO: This is strange to have two maps but HSI above is used on both sides
/**
* Map from full server-instance name to the RPC connection for this server.
*/
- private final Map<String, HRegionInterface> serverConnections =
- new HashMap<String, HRegionInterface>();
+ private final Map<ServerName, HRegionInterface> serverConnections =
+ new HashMap<ServerName, HRegionInterface>();
private final Server master;
private final MasterServices services;
- // Reporting to track master metrics.
- private final MasterMetrics metrics;
-
private final DeadServer deadservers;
private final long maxSkew;
@@ -95,26 +89,25 @@ public class ServerManager {
* Constructor.
* @param master
* @param services
- * @param metrics
*/
- public ServerManager(final Server master, final MasterServices services,
- MasterMetrics metrics) {
+ public ServerManager(final Server master, final MasterServices services) {
this.master = master;
this.services = services;
- this.metrics = metrics;
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
- this.deadservers =
- new DeadServer(c.getInt("hbase.master.maxdeadservers", 100));
+ this.deadservers = new DeadServer();
}
/**
* Let the server manager know a new regionserver has come online
- * @param serverInfo
+ * @param ia The remote address
+ * @param port The remote port
+ * @param serverStartcode
* @param serverCurrentTime The current time of the region server in ms
* @throws IOException
*/
- void regionServerStartup(final HServerInfo serverInfo, long serverCurrentTime)
+ void regionServerStartup(final InetAddress ia, final int port,
+ final long serverStartcode, long serverCurrentTime)
throws IOException {
// Test for case where we get a region startup message from a regionserver
// that has been quickly restarted but whose znode expiration handler has
@@ -123,58 +116,66 @@ public class ServerManager {
// is, reject the server and trigger its expiration. The next time it comes
// in, it should have been removed from serverAddressToServerInfo and queued
// for processing by ProcessServerShutdown.
- HServerInfo info = new HServerInfo(serverInfo);
- checkIsDead(info.getServerName(), "STARTUP");
- checkAlreadySameHostPort(info);
- checkClockSkew(info, serverCurrentTime);
- recordNewServer(info, false, null);
+ ServerName sn = new ServerName(ia.getHostName(), port, serverStartcode);
+ checkClockSkew(sn, serverCurrentTime);
+ checkIsDead(sn, "STARTUP");
+ checkAlreadySameHostPort(sn);
+ recordNewServer(sn, HServerLoad.EMPTY_HSERVERLOAD);
+ }
+
+ void regionServerReport(ServerName sn, HServerLoad hsl)
+ throws YouAreDeadException, PleaseHoldException {
+ checkIsDead(sn, "REPORT");
+ if (!this.onlineServers.containsKey(sn)) {
+ // Already have this host+port combo and its just different start code?
+ checkAlreadySameHostPort(sn);
+ // Just let the server in. Presume master joining a running cluster.
+ // recordNewServer is what happens at the end of reportServerStartup.
+ // The only thing we are skipping is passing back to the regionserver
+ // the ServerName to use. Here we presume a master has already done
+ // that so we'll press on with whatever it gave us for ServerName.
+ recordNewServer(sn, hsl);
+ } else {
+ this.onlineServers.put(sn, hsl);
+ }
}
/**
* Test to see if we have a server of same host and port already.
- * @param serverInfo
+ * @param serverName
* @throws PleaseHoldException
*/
- void checkAlreadySameHostPort(final HServerInfo serverInfo)
+ void checkAlreadySameHostPort(final ServerName serverName)
throws PleaseHoldException {
- String hostAndPort = serverInfo.getServerAddress().toString();
- HServerInfo existingServer =
- haveServerWithSameHostAndPortAlready(serverInfo.getHostnamePort());
+ ServerName existingServer =
+ ServerName.findServerWithSameHostnamePort(getOnlineServersList(), serverName);
if (existingServer != null) {
- String message = "Server start rejected; we already have " + hostAndPort +
- " registered; existingServer=" + existingServer + ", newServer=" + serverInfo;
+ String message = "Server serverName=" + serverName +
+ " rejected; we already have " + existingServer.toString() +
+ " registered with same hostname and port";
LOG.info(message);
- if (existingServer.getStartCode() < serverInfo.getStartCode()) {
+ if (existingServer.getStartcode() < serverName.getStartcode()) {
LOG.info("Triggering server recovery; existingServer " +
- existingServer.getServerName() + " looks stale");
+ existingServer + " looks stale");
expireServer(existingServer);
}
throw new PleaseHoldException(message);
}
}
- private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
- synchronized (this.onlineServers) {
- for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
- if (e.getValue().getHostnamePort().equals(hostnamePort)) {
- return e.getValue();
- }
- }
- }
- return null;
- }
-
/**
* Checks if the clock skew between the server and the master. If the clock
* skew is too much it will throw an Exception.
+ * @param serverName Incoming servers's name
+ * @param serverCurrentTime
* @throws ClockOutOfSyncException
*/
- private void checkClockSkew(final HServerInfo serverInfo,
+ private void checkClockSkew(final ServerName serverName,
final long serverCurrentTime)
throws ClockOutOfSyncException {
long skew = System.currentTimeMillis() - serverCurrentTime;
if (skew > maxSkew) {
- String message = "Server " + serverInfo.getServerName() + " has been " +
+ String message = "Server " + serverName + " has been " +
"rejected; Reported time is too far out of sync with master. " +
"Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
LOG.warn(message);
@@ -186,11 +187,11 @@ public class ServerManager {
* If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry
* from the dead list.
- * @param serverName Server name formatted as host_port_startcode.
+ * @param serverName
* @param what START or REPORT
* @throws YouAreDeadException
*/
- private void checkIsDead(final String serverName, final String what)
+ private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException {
if (this.deadservers.isDeadServer(serverName)) {
// host name, port and start code all match with existing one of the
@@ -210,157 +211,34 @@ public class ServerManager {
}
/**
- * Adds the HSI to the RS list
- * @param info The region server informations
- * @param useInfoLoad True if the load from the info should be used; e.g.
- * under a master failover
- * @param hri Region interface. Can be null.
- */
- void recordNewServer(HServerInfo info, boolean useInfoLoad,
- HRegionInterface hri) {
- HServerLoad load = useInfoLoad? info.getLoad(): new HServerLoad();
- String serverName = info.getServerName();
- LOG.info("Registering server=" + serverName + ", regionCount=" +
- load.getLoad() + ", userLoad=" + useInfoLoad);
- info.setLoad(load);
- // TODO: Why did we update the RS location ourself? Shouldn't RS do this?
- // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
- // -- If I understand the question, the RS does not update the location
- // because could be disagreement over locations because of DNS issues; only
- // master does DNS now -- St.Ack 20100929.
- this.onlineServers.put(serverName, info);
- if (hri == null) {
- serverConnections.remove(serverName);
- } else {
- serverConnections.put(serverName, hri);
- }
- }
-
- /**
- * Called to process the messages sent from the region server to the master
- * along with the heart beat.
- *
- * @param serverInfo
- * @param msgs
- * @param mostLoadedRegions Array of regions the region server is submitting
- * as candidates to be rebalanced, should it be overloaded
- * @return messages from master to region server indicating what region
- * server should do.
- *
- * @throws IOException
- */
- HMsg [] regionServerReport(final HServerInfo serverInfo,
- final HMsg [] msgs, final HRegionInfo[] mostLoadedRegions)
- throws IOException {
- // Be careful. This method does returns in the middle.
- HServerInfo info = new HServerInfo(serverInfo);
-
- // Check if dead. If it is, it'll get a 'You Are Dead!' exception.
- checkIsDead(info.getServerName(), "REPORT");
-
- // If we don't know this server, tell it shutdown.
- HServerInfo storedInfo = this.onlineServers.get(info.getServerName());
- if (storedInfo == null) {
- // Maybe we already have this host+port combo and its just different
- // start code?
- checkAlreadySameHostPort(info);
- // Just let the server in. Presume master joining a running cluster.
- // recordNewServer is what happens at the end of reportServerStartup.
- // The only thing we are skipping is passing back to the regionserver
- // the HServerInfo to use. Here we presume a master has already done
- // that so we'll press on with whatever it gave us for HSI.
- recordNewServer(info, true, null);
- // If msgs, put off their processing but this is not enough because
- // its possible that the next time the server reports in, we'll still
- // not be up and serving. For example, if a split, we'll need the
- // regions and servers setup in the master before the below
- // handleSplitReport will work. TODO: FIx!!
- if (msgs.length > 0)
- throw new PleaseHoldException("FIX! Putting off " +
- "message processing because not yet rwady but possible we won't be " +
- "ready next on next report");
- }
-
- for (HMsg msg: msgs) {
- LOG.info("Received " + msg + " from " + serverInfo.getServerName());
- switch (msg.getType()) {
- default:
- LOG.error("Unhandled msg type " + msg);
- }
- }
-
- HMsg [] reply = null;
- if (this.clusterShutdown) {
- if (isOnlyMetaRegionServersOnline()) {
- LOG.info("Only catalog regions remaining; running unassign");
- // The only remaining regions are catalog regions.
- // Shutdown needs to be staggered; the meta regions need to close last
- // in case they need to be updated during the close melee. If only
- // catalog reigons remaining, tell them they can go down now too. On
- // close of region, the regionservers should then shut themselves down.
- this.services.getAssignmentManager().unassignCatalogRegions();
- }
- }
- return processRegionServerAllsWell(info, mostLoadedRegions, reply);
- }
-
- /**
- * @return True if all online servers are carrying one or more catalog
- * regions, there are no servers online carrying user regions only
- */
- private boolean isOnlyMetaRegionServersOnline() {
- List<HServerInfo> onlineServers = getOnlineServersList();
- for (HServerInfo hsi: onlineServers) {
- if (!this.services.getAssignmentManager().isMetaRegionServer(hsi)) {
- return false;
- }
- }
- return true;
+ * Adds the onlineServers list.
+ * @param hsl
+ * @param serverName The remote servers name.
+ */
+ void recordNewServer(final ServerName serverName, final HServerLoad hsl) {
+ LOG.info("Registering server=" + serverName);
+ this.onlineServers.put(serverName, hsl);
+ this.serverConnections.remove(serverName);
}
/**
- * RegionServer is checking in, no exceptional circumstances
- * @param serverInfo
- * @param mostLoadedRegions
- * @param msgs
- * @return
- * @throws IOException
- */
- private HMsg[] processRegionServerAllsWell(HServerInfo serverInfo,
- final HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
- throws IOException {
- // Refresh the info object and the load information
- this.onlineServers.put(serverInfo.getServerName(), serverInfo);
- HServerLoad load = serverInfo.getLoad();
- if (load != null && this.metrics != null) {
- this.metrics.incrementRequests(load.getNumberOfRequests());
- }
- // No more piggyback messages on heartbeats for other stuff
- return msgs;
- }
-
- /**
- * Make server load accessible to AssignmentManager
* @param serverName
- * @return
- * @throws HServerLoad if serverName is known
+ * @return HServerLoad if serverName is known else null
*/
- HServerLoad getLoad(String serverName) {
- HServerInfo hsi = this.onlineServers.get(serverName);
- if (hsi == null) return null;
- return hsi.getLoad();
+ public HServerLoad getLoad(final ServerName serverName) {
+ return this.onlineServers.get(serverName.toString());
}
/**
* @param serverName
- * @return True if we removed server from the list.
+ * @return HServerLoad if serverName is known else null
+ * @deprecated Use {@link #getLoad(HServerAddress)}
*/
- private boolean removeServerInfo(final String serverName) {
- HServerInfo info = this.onlineServers.remove(serverName);
- if (info != null) {
- return true;
- }
- return false;
+ public HServerLoad getLoad(final HServerAddress address) {
+ ServerName sn = new ServerName(address.toString(), -1);
+ ServerName actual =
+ ServerName.findServerWithSameHostnamePort(this.getOnlineServersList(), sn);
+ return actual == null? null: getLoad(actual);
}
/**
@@ -373,9 +251,9 @@ public class ServerManager {
int totalLoad = 0;
int numServers = 0;
double averageLoad = 0.0;
- for (HServerInfo hsi : onlineServers.values()) {
+ for (HServerLoad hsl: this.onlineServers.values()) {
numServers++;
- totalLoad += hsi.getLoad().getNumberOfRegions();
+ totalLoad += hsl.getNumberOfRegions();
}
averageLoad = (double)totalLoad / (double)numServers;
return averageLoad;
@@ -388,24 +266,16 @@ public class ServerManager {
}
/**
- * @param name server name
- * @return HServerInfo for the given server address
- */
- public HServerInfo getServerInfo(String name) {
- return this.onlineServers.get(name);
- }
-
- /**
* @return Read-only map of servers to serverinfo
*/
- public Map<String, HServerInfo> getOnlineServers() {
+ public Map<ServerName, HServerLoad> getOnlineServers() {
// Presumption is that iterating the returned Map is OK.
synchronized (this.onlineServers) {
return Collections.unmodifiableMap(this.onlineServers);
}
}
- public Set<String> getDeadServers() {
+ public Set<ServerName> getDeadServers() {
return this.deadservers.clone();
}
@@ -417,40 +287,11 @@ public class ServerManager {
return this.deadservers.areDeadServersInProgress();
}
- /**
- * @param hsa
- * @return The HServerInfo whose HServerAddress is <code>hsa</code> or null
- * if nothing found.
- */
- public HServerInfo getHServerInfo(final HServerAddress hsa) {
- synchronized(this.onlineServers) {
- // TODO: This is primitive. Do a better search.
- for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
- if (e.getValue().getServerAddress().equals(hsa)) {
- return e.getValue();
- }
- }
- }
- return null;
- }
-
- private void notifyOnlineServers() {
- synchronized (this.onlineServers) {
- this.onlineServers.notifyAll();
- }
- }
-
- /*
- * Wait on regionservers to report in
- * with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
- * the master is going down. Waits until all region servers come back with
- * a MSG_REGIONSERVER_STOP.
- */
void letRegionServersShutdown() {
synchronized (onlineServers) {
- while (onlineServers.size() > 0) {
+ while (!onlineServers.isEmpty()) {
StringBuilder sb = new StringBuilder();
- for (String key: this.onlineServers.keySet()) {
+ for (ServerName key: this.onlineServers.keySet()) {
if (sb.length() > 0) {
sb.append(", ");
}
@@ -470,19 +311,15 @@ public class ServerManager {
* Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing.
*/
- public synchronized void expireServer(final HServerInfo hsi) {
- // First check a server to expire. ServerName is of the form:
- // <hostname> , <port> , <startcode>
- String serverName = hsi.getServerName();
- HServerInfo info = this.onlineServers.get(serverName);
- if (info == null) {
- LOG.warn("Received expiration of " + hsi.getServerName() +
+ public synchronized void expireServer(final ServerName serverName) {
+ if (!this.onlineServers.containsKey(serverName)) {
+ LOG.warn("Received expiration of " + serverName +
" but server is not currently online");
return;
}
if (this.deadservers.contains(serverName)) {
// TODO: Can this happen? It shouldn't be online in this case?
- LOG.warn("Received expiration of " + hsi.getServerName() +
+ LOG.warn("Received expiration of " + serverName +
" but server shutdown is already in progress");
return;
}
@@ -495,7 +332,7 @@ public class ServerManager {
// If cluster is going down, yes, servers are going to be expiring; don't
// process as a dead server
if (this.clusterShutdown) {
- LOG.info("Cluster shutdown set; " + hsi.getServerName() +
+ LOG.info("Cluster shutdown set; " + serverName +
" expired; onlineServers=" + this.onlineServers.size());
if (this.onlineServers.isEmpty()) {
master.stop("Cluster shutdown set; onlineServer=0");
@@ -506,9 +343,8 @@ public class ServerManager {
// Was this server carrying root?
boolean carryingRoot;
try {
- HServerAddress address = ct.getRootLocation();
- carryingRoot = address != null &&
- hsi.getServerAddress().equals(address);
+ ServerName address = ct.getRootLocation();
+ carryingRoot = address.equals(serverName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted");
@@ -519,15 +355,14 @@ public class ServerManager {
// run into fact that meta is dead). I can ask assignment manager. It
// has an inmemory list of who has what. This list will be cleared as we
// process the dead server but should be find asking it now.
- HServerAddress address = ct.getMetaLocation();
- boolean carryingMeta =
- address != null && hsi.getServerAddress().equals(address);
+ ServerName address = ct.getMetaLocation();
+ boolean carryingMeta = address.equals(serverName);
if (carryingRoot || carryingMeta) {
this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
- this.services, this.deadservers, info, carryingRoot, carryingMeta));
+ this.services, this.deadservers, serverName, carryingRoot, carryingMeta));
} else {
this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
- this.services, this.deadservers, info));
+ this.services, this.deadservers, serverName));
}
LOG.debug("Added=" + serverName +
" to dead servers, submitted shutdown handler to be executed, root=" +
@@ -544,12 +379,12 @@ public class ServerManager {
* @param server server to open a region
* @param region region to open
*/
- public void sendRegionOpen(HServerInfo server, HRegionInfo region)
+ public void sendRegionOpen(final ServerName server, HRegionInfo region)
throws IOException {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
- LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
- + " failed because no RPC connection found to this server");
+ LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
+ " failed because no RPC connection found to this server");
return;
}
hri.openRegion(region);
@@ -563,12 +398,12 @@ public class ServerManager {
* @param server server to open a region
* @param regions regions to open
*/
- public void sendRegionOpen(HServerInfo server, List<HRegionInfo> regions)
+ public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
throws IOException {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
- LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
- + " failed because no RPC connection found to this server");
+ LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
+ " failed because no RPC connection found to this server");
return;
}
hri.openRegions(regions);
@@ -584,13 +419,13 @@ public class ServerManager {
* @return true if server acknowledged close, false if not
* @throws IOException
*/
- public boolean sendRegionClose(HServerInfo server, HRegionInfo region)
+ public boolean sendRegionClose(ServerName server, HRegionInfo region)
throws IOException {
if (server == null) throw new NullPointerException("Passed server is null");
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
throw new IOException("Attempting to send CLOSE RPC to server " +
- server.getServerName() + " for region " +
+ server.toString() + " for region " +
region.getRegionNameAsString() +
" failed because no RPC connection found to this server");
}
@@ -598,31 +433,30 @@ public class ServerManager {
}
/**
- * @param info
+ * @param sn
* @return
* @throws IOException
* @throws RetriesExhaustedException wrapping a ConnectException if failed
* putting up proxy.
*/
- private HRegionInterface getServerConnection(HServerInfo info)
+ private HRegionInterface getServerConnection(final ServerName sn)
throws IOException {
HConnection connection =
HConnectionManager.getConnection(this.master.getConfiguration());
- HRegionInterface hri = serverConnections.get(info.getServerName());
+ HRegionInterface hri = this.serverConnections.get(sn.toString());
if (hri == null) {
- LOG.debug("New connection to " + info.getServerName());
- hri = connection.getHRegionConnection(info.getServerAddress(), false);
- this.serverConnections.put(info.getServerName(), hri);
+ LOG.debug("New connection to " + sn.toString());
+ hri = connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ this.serverConnections.put(sn, hri);
}
return hri;
}
/**
* Waits for the regionservers to report in.
- * @return Count of regions out on cluster
* @throws InterruptedException
*/
- public int waitForRegionServers()
+ public void waitForRegionServers()
throws InterruptedException {
long interval = this.master.getConfiguration().
getLong("hbase.master.wait.on.regionservers.interval", 3000);
@@ -640,31 +474,18 @@ public class ServerManager {
}
oldcount = count;
}
- // Count how many regions deployed out on cluster. If fresh start, it'll
- // be none but if not a fresh start, we'll have registered servers when
- // they came in on the {@link #regionServerReport(HServerInfo)} as opposed to
- // {@link #regionServerStartup(HServerInfo)} and it'll be carrying an
- // actual server load.
- int regionCount = 0;
- for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
- HServerLoad load = e.getValue().getLoad();
- if (load != null) regionCount += load.getLoad();
- }
- LOG.info("Exiting wait on regionserver(s) to checkin; count=" + count +
- ", stopped=" + this.master.isStopped() +
- ", count of regions out on cluster=" + regionCount);
- return regionCount;
}
/**
* @return A copy of the internal list of online servers.
*/
- public List<HServerInfo> getOnlineServersList() {
+ public List<ServerName> getOnlineServersList() {
// TODO: optimize the load balancer call so we don't need to make a new list
- return new ArrayList<HServerInfo>(onlineServers.values());
+ // TODO: FIX. THIS IS POPULAR CALL.
+ return new ArrayList<ServerName>(this.onlineServers.keySet());
}
- public boolean isServerOnline(String serverName) {
+ public boolean isServerOnline(ServerName serverName) {
return onlineServers.containsKey(serverName);
}
@@ -681,6 +502,5 @@ public class ServerManager {
* Stop the ServerManager. Currently does nothing.
*/
public void stop() {
-
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Apr 27 23:12:42 2011
@@ -156,10 +156,13 @@ public class SplitLogManager extends Zoo
}
public void finishInitialization() {
- Threads.setDaemonThreadRunning(timeoutMonitor, serverName
- + ".splitLogManagerTimeoutMonitor");
- this.watcher.registerListener(this);
- lookForOrphans();
+ Threads.setDaemonThreadRunning(timeoutMonitor, serverName +
+ ".splitLogManagerTimeoutMonitor");
+ // Watcher can be null during tests with Mock'd servers.
+ if (this.watcher != null) {
+ this.watcher.registerListener(this);
+ lookForOrphans();
+ }
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java Wed Apr 27 23:12:42 2011
@@ -19,8 +19,8 @@
*/
package org.apache.hadoop.hbase.master.handler;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
@@ -34,9 +34,9 @@ public class MetaServerShutdownHandler e
public MetaServerShutdownHandler(final Server server,
final MasterServices services,
- final DeadServer deadServers, final HServerInfo hsi,
+ final DeadServer deadServers, final ServerName serverName,
final boolean carryingRoot, final boolean carryingMeta) {
- super(server, services, deadServers, hsi, EventType.M_META_SERVER_SHUTDOWN);
+ super(server, services, deadServers, serverName, EventType.M_META_SERVER_SHUTDOWN);
this.carryingRoot = carryingRoot;
this.carryingMeta = carryingMeta;
}
@@ -50,4 +50,4 @@ public class MetaServerShutdownHandler e
boolean isCarryingMeta() {
return this.carryingMeta;
}
-}
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java Wed Apr 27 23:12:42 2011
@@ -22,8 +22,8 @@ package org.apache.hadoop.hbase.master.h
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -36,7 +36,7 @@ public class OpenedRegionHandler extends
private static final Log LOG = LogFactory.getLog(OpenedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo regionInfo;
- private final HServerInfo serverInfo;
+ private final ServerName sn;
private final OpenedPriority priority;
private enum OpenedPriority {
@@ -55,11 +55,11 @@ public class OpenedRegionHandler extends
public OpenedRegionHandler(Server server,
AssignmentManager assignmentManager, HRegionInfo regionInfo,
- HServerInfo serverInfo) {
+ ServerName sn) {
super(server, EventType.RS_ZK_REGION_OPENED);
this.assignmentManager = assignmentManager;
this.regionInfo = regionInfo;
- this.serverInfo = serverInfo;
+ this.sn = sn;
if(regionInfo.isRootRegion()) {
priority = OpenedPriority.ROOT;
} else if(regionInfo.isMetaRegion()) {
@@ -94,7 +94,7 @@ public class OpenedRegionHandler extends
// Code to defend against case where we get SPLIT before region open
// processing completes; temporary till we make SPLITs go via zk -- 0.92.
if (this.assignmentManager.isRegionInTransition(regionInfo) != null) {
- this.assignmentManager.regionOnline(regionInfo, serverInfo);
+ this.assignmentManager.regionOnline(regionInfo, this.sn);
} else {
LOG.warn("Skipping the onlining of " + regionInfo.getRegionNameAsString() +
" because regions is NOT in RIT -- presuming this is because it SPLIT");
@@ -106,7 +106,7 @@ public class OpenedRegionHandler extends
assignmentManager.unassign(regionInfo);
} else {
LOG.debug("Opened region " + regionInfo.getRegionNameAsString() +
- " on " + serverInfo.getServerName());
+ " on " + this.sn.toString());
}
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Wed Apr 27 23:12:42 2011
@@ -28,8 +28,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
@@ -47,29 +47,29 @@ import org.apache.zookeeper.KeeperExcept
/**
* Process server shutdown.
* Server-to-handle must be already in the deadservers lists. See
- * {@link ServerManager#expireServer(HServerInfo)}.
+ * {@link ServerManager#expireServer(ServerName)}
*/
public class ServerShutdownHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
- private final HServerInfo hsi;
+ private final ServerName serverName;
private final Server server;
private final MasterServices services;
private final DeadServer deadServers;
public ServerShutdownHandler(final Server server, final MasterServices services,
- final DeadServer deadServers, final HServerInfo hsi) {
- this(server, services, deadServers, hsi, EventType.M_SERVER_SHUTDOWN);
+ final DeadServer deadServers, final ServerName serverName) {
+ this(server, services, deadServers, serverName, EventType.M_SERVER_SHUTDOWN);
}
ServerShutdownHandler(final Server server, final MasterServices services,
- final DeadServer deadServers, final HServerInfo hsi, EventType type) {
+ final DeadServer deadServers, final ServerName serverName, EventType type) {
super(server, type);
- this.hsi = hsi;
+ this.serverName = serverName;
this.server = server;
this.services = services;
this.deadServers = deadServers;
- if (!this.deadServers.contains(hsi.getServerName())) {
- LOG.warn(hsi.getServerName() + " is NOT in deadservers; it should be!");
+ if (!this.deadServers.contains(this.serverName)) {
+ LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
}
@@ -89,7 +89,7 @@ public class ServerShutdownHandler exten
@Override
public void process() throws IOException {
- final String serverName = this.hsi.getServerName();
+ final ServerName serverName = this.serverName;
LOG.info("Splitting logs for " + serverName);
this.services.getMasterFileSystem().splitLog(serverName);
@@ -99,7 +99,7 @@ public class ServerShutdownHandler exten
// OFFLINE? -- and then others after like CLOSING that depend on log
// splitting.
List<RegionState> regionsInTransition =
- this.services.getAssignmentManager().processServerShutdown(this.hsi);
+ this.services.getAssignmentManager().processServerShutdown(this.serverName);
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
@@ -134,7 +134,7 @@ public class ServerShutdownHandler exten
try {
this.server.getCatalogTracker().waitForMeta();
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
- this.hsi);
+ this.serverName);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java Wed Apr 27 23:12:42 2011
@@ -24,8 +24,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -38,7 +38,7 @@ public class SplitRegionHandler extends
private static final Log LOG = LogFactory.getLog(SplitRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo parent;
- private final HServerInfo serverInfo;
+ private final ServerName sn;
private final List<HRegionInfo> daughters;
/**
* For testing only! Set to true to skip handling of split.
@@ -47,11 +47,11 @@ public class SplitRegionHandler extends
public SplitRegionHandler(Server server,
AssignmentManager assignmentManager, HRegionInfo regionInfo,
- HServerInfo serverInfo, final List<HRegionInfo> daughters) {
+ ServerName sn, final List<HRegionInfo> daughters) {
super(server, EventType.RS_ZK_REGION_SPLIT);
this.assignmentManager = assignmentManager;
this.parent = regionInfo;
- this.serverInfo = serverInfo;
+ this.sn = sn;
this.daughters = daughters;
}
@@ -70,7 +70,7 @@ public class SplitRegionHandler extends
LOG.warn("Skipping split message, TEST_SKIP is set");
return;
}
- this.assignmentManager.handleSplitReport(this.serverInfo, this.parent,
+ this.assignmentManager.handleSplitReport(this.sn, this.parent,
this.daughters.get(0), this.daughters.get(1));
// Remove region from ZK
try {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Apr 27 23:12:42 2011
@@ -58,12 +58,12 @@ import org.apache.hadoop.hbase.DroppedSn
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -235,7 +235,7 @@ public class HRegion implements HeapSize
final long memstoreFlushSize;
private volatile long lastFlushTime;
final RegionServerServices rsServices;
- private List<Pair<Long,Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
+ private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
private final long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes