You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:24 UTC
svn commit: r1181570 - in /hbase/branches/0.89: bin/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/util/
Author: nspiegelberg
Date: Tue Oct 11 02:21:23 2011
New Revision: 1181570
URL: http://svn.apache.org/viewvc?rev=1181570&view=rev
Log:
Assign regions based on block locality and print out the region locality
Summary:
reate a command line tools to print out the region locality info
Example: running this command line in my dev cluster: 210
====================
./bin/hbase localityck
Host :230. has 13 / 13 blocks for the test1 :
2d59bb40ffba29ea04fe11b27d997646
Host :250. has 12 / 12 blocks for the test1 :
c60bd76550b9c6d5348740f4a41fc9bf
Host :230. has 12 / 12 blocks for the test1 :
6fb75bb06213d19171abf816791b2423
Host :230. has 4 / 4 blocks for the -ROOT- : 70236052
Host :230. has 12 / 12 blocks for the test1 :
01c048a618024b38a5f0192614a9660a
Host :230. has 12 / 12 blocks for the test1 :
54dd7d26a69c9474f98f54cbdc35cb98
Host :290. has 4 / 4 blocks for the .META. :
1028785192
Host :230. has 12 / 12 blocks for the test1 :
36dbc39af1465227cb6118fb17da4a20
Host :230. has 12 / 12 blocks for the test1 :
4d3958e8ad30bcbe39c037ceb94f3c10
Host :230. has 12 / 12 blocks for the test1 :
ced427fa96da9eb3cd9c351ca074a208
Host :230. has 3 / 3 blocks for the test2 :
a638e6d34311e2995154679dea73bb8c
Host :230. has 12 / 12 blocks for the test1 :
5035d560a22b2a65389c4b6acde9cbf8
Host :230. has 12 / 12 blocks for the test1 :
bd55c6af492a684adc573380267bf188
Host :250. has 12 / 12 blocks for the test1 :
59534b14cff211e712151be628445aec
Host :250. has 12 / 12 blocks for the test1 :
9ec644b196af2d551b3decfc7409ef9b
Host :230. has 12 / 12 blocks for the test1 :
0e9416af3be05fb3844d4cac0c4b5198
Host :250. has 12 / 12 blocks for the test1 :
59865cdd283215cc2250423d2584d034
Host :250. has 12 / 12 blocks for the test1 :
e511fb3efa475866d7b27ac17aea81e4
Test Plan:
It has been tested on dev cluster. I will continue working on more test cases
to stablize the code.
Reviewed By: kannan
Reviewers: kannan, kranganathan, gqchen, nspiegelberg, bogdan
Commenters: dhruba, mbautin
CC: , hbase@lists, kannan, dhruba, liyintang, mbautin, achao
Differential Revision: 257279
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
Modified:
hbase/branches/0.89/bin/hbase
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
Modified: hbase/branches/0.89/bin/hbase
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/bin/hbase?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/bin/hbase (original)
+++ hbase/branches/0.89/bin/hbase Tue Oct 11 02:21:23 2011
@@ -76,6 +76,7 @@ if [ $# = 0 ]; then
echo " avro run an HBase Avro server"
echo " migrate upgrade an hbase.rootdir"
echo " hbck run the hbase 'fsck' tool"
+ echo " localityck run the hbase locality check tool"
echo " verify [-help] verify that the cluster is working properly"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
@@ -256,6 +257,8 @@ elif [ "$COMMAND" = "migrate" ] ; then
CLASS='org.apache.hadoop.hbase.util.Migrate'
elif [ "$COMMAND" = "hbck" ] ; then
CLASS='org.apache.hadoop.hbase.client.HBaseFsck'
+elif [ "$COMMAND" = "localityck" ] ; then
+ CLASS='org.apache.hadoop.hbase.client.HBaseLocalityCheck'
elif [ "$COMMAND" = "verify" ] ; then
CLASS='org.apache.hadoop.hbase.loadtest.LoadTester'
elif [ "$COMMAND" = "zookeeper" ] ; then
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java Tue Oct 11 02:21:23 2011
@@ -120,13 +120,11 @@ public class HBaseFsck {
new LinkedBlockingQueue<Runnable>());
}
- /**
- * Contacts the master and prints out cluster-wide information
- * @throws IOException if a remote or network exception occurs
- * @return 0 on success, non-zero on failure
- */
- int doWork() throws IOException, InterruptedException {
+ public TreeMap<String, HbckInfo> getRegionInfo() {
+ return this.regionInfo;
+ }
+ public int initAndScanRootMeta() throws IOException {
// print hbase server version
errors.print("Version: " + status.getHBaseVersion());
LOG.debug("timelag = " + StringUtils.formatTime(this.timelag));
@@ -150,6 +148,19 @@ public class HBaseFsck {
errors.reportError("Encountered fatal error. Exitting...");
return -1;
}
+ return 0;
+ }
+
+ /**
+ * Contacts the master and prints out cluster-wide information
+ * @throws IOException if a remote or network exception occurs
+ * @return 0 on success, non-zero on failure
+ */
+ int doWork() throws IOException, InterruptedException {
+
+ if (initAndScanRootMeta() == -1) {
+ return -1;
+ }
// get a list of all tables that have not changed recently.
AtomicInteger numSkipped = new AtomicInteger(0);
@@ -267,6 +278,23 @@ public class HBaseFsck {
return true;
}
+ /**
+ * Contacts each regionserver and fetches metadata about regions.
+ * @throws IOException if a remote or network exception occurs
+ */
+ void scanRegionServers() throws IOException, InterruptedException {
+ Collection<HServerInfo> regionServers = status.getServerInfo();
+ errors.print("Number of live region servers:" +
+ regionServers.size());
+ if (details) {
+ for (HServerInfo rsinfo: regionServers) {
+ errors.detail("\t RegionServer:" + rsinfo.getServerName());
+ }
+ }
+ scanRegionServers(regionServers);
+ // finish all async tasks before analyzing what we have
+ finishAsyncWork();
+ }
/**
* Contacts each regionserver and fetches metadata about regions.
@@ -274,7 +302,7 @@ public class HBaseFsck {
* @throws IOException if a remote or network exception occurs
*/
void scanRegionServers(Collection<HServerInfo> regionServerList)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
// loop to contact each region server in parallel
for (HServerInfo rsinfo:regionServerList) {
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1181570&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java Tue Oct 11 02:21:23 2011
@@ -0,0 +1,128 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseFsck.HbckInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+public class HBaseLocalityCheck {
+ private final FileSystem fs;
+ private final Path rootdir;
+ private Map<String, String> preferredRegionToRegionServerMapping = null;
+ private Configuration conf;
+ private static final Log LOG =
+ LogFactory.getLog(HBaseLocalityCheck.class.getName());
+
+ public HBaseLocalityCheck(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.rootdir = FSUtils.getRootDir(conf);
+ this.fs = FileSystem.get(conf);
+ }
+
+ /**
+ * Show the locality information for each table. It will show how many regions
+ * in each table and how many of them are running the best locality region
+ * server
+ *
+ * @throws MasterNotRunningException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void showTableLocality()
+ throws MasterNotRunningException, IOException, InterruptedException {
+ // create a fsck object
+ HBaseFsck fsck = new HBaseFsck(conf);
+ fsck.initAndScanRootMeta();
+ fsck.scanRegionServers();
+ TreeMap<String, HbckInfo> regionInfo = fsck.getRegionInfo();
+
+ boolean localityMatch = false;
+ LOG.info("Locality information by region");
+
+ // Get the locality info for each region by scanning the file system
+ preferredRegionToRegionServerMapping =
+ FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+
+ Map<String, AtomicInteger> tableToRegionCountMap =
+ new HashMap<String, AtomicInteger>();
+ Map<String, AtomicInteger> tableToRegionsWithLocalityMap =
+ new HashMap<String, AtomicInteger>();
+
+ for (Map.Entry<String, String> entry :
+ preferredRegionToRegionServerMapping.entrySet()) {
+ // get region name and table
+ String name = entry.getKey();
+ int spliterIndex =name.lastIndexOf(":");
+ String regionName = name.substring(spliterIndex+1);
+ String tableName = name.substring(0, spliterIndex);
+
+ //get region server hostname
+ String bestHostName = entry.getValue();
+ localityMatch = false;
+ HbckInfo region = regionInfo.get(regionName);
+ if (region != null && region.deployedOn != null &&
+ region.deployedOn.size() != 0) {
+ String realHostName = null;
+ List<HServerAddress> serverList = region.deployedOn;
+ if (!tableToRegionCountMap.containsKey(tableName)){
+ tableToRegionCountMap.put(tableName, new AtomicInteger(1));
+ tableToRegionsWithLocalityMap.put(tableName, new AtomicInteger(0));
+ } else {
+ tableToRegionCountMap.get(tableName).incrementAndGet();
+ }
+
+ realHostName = serverList.get(0).getHostname();
+ if (realHostName.equalsIgnoreCase(bestHostName)) {
+ localityMatch = true;
+ tableToRegionsWithLocalityMap.get(tableName).incrementAndGet();
+ }
+
+ LOG.info("<table:region> : <" + name + "> is running on host: "
+ + realHostName + " \n and the prefered host is " + bestHostName +
+ " [" + (localityMatch ? "Matched]" : "NOT matched]"));
+
+ } else {
+ LOG.info("<table:region> : <" + name + "> no info obtained for this" +
+ " region from any of the region servers.");
+ continue;
+ }
+ }
+
+ LOG.info("======== Locality Summary ===============");
+ for(String tableName : tableToRegionCountMap.keySet()) {
+ int totalRegions = tableToRegionCountMap.get(tableName).get();
+ int totalRegionsWithLocality =
+ tableToRegionsWithLocalityMap.get(tableName).get();
+
+ float rate = (totalRegionsWithLocality / (float) totalRegions) * 100;
+ LOG.info("For Table: "+tableName+" ; #Total Regions: " + totalRegions +
+ " ;" + " # Local Regions " + totalRegionsWithLocality + " rate = "
+ + rate + " %");
+ }
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException {
+ long startTime = System.currentTimeMillis();
+ Configuration conf = HBaseConfiguration.create();
+ HBaseLocalityCheck localck = new HBaseLocalityCheck(conf);
+ localck.showTableLocality();
+ LOG.info("Locality Summary takes " +
+ (System.currentTimeMillis() - startTime) + " ms to run" );
+ Runtime.getRuntime().exit(0);
+ }
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:21:23 2011
@@ -168,6 +168,11 @@ public class HMaster extends Thread impl
// True if this is the master that started the cluster.
boolean isClusterStartup;
+ private long masterStartupTime;
+ private Map<String, String> preferredRegionToRegionServerMapping;
+ private long applyPreferredAssignmentPeriod = 0l;
+ private long holdRegionForBestLocalityPeriod = 0l;
+
/**
* Constructor
* @param conf configuration
@@ -266,7 +271,6 @@ public class HMaster extends Thread impl
// start the "open region" executor service
HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
-
// start the region manager
regionManager = new RegionManager(this);
@@ -275,8 +279,50 @@ public class HMaster extends Thread impl
// We're almost open for business
this.closed.set(false);
LOG.info("HMaster w/ hbck initialized on " + this.address.toString());
+
+ // assign the regions based on the region locality in this period of time
+ this.applyPreferredAssignmentPeriod =
+ conf.getLong("hbase.master.applyPreferredAssignment.period", 5 * 60 * 1000);
+
+ // if a region's best region server hasn't checked in for this much time
+ // since master startup, then the master is free to assign this region
+ // out to any region server
+ this.holdRegionForBestLocalityPeriod =
+ conf.getLong("hbase.master.holdRegionForBestLocality.period",
+ 1 * 60 * 1000);
+
+ // disable scanning dfs by setting applyPreferredAssignmentPeriod to 0
+ if (isClusterStartup && applyPreferredAssignmentPeriod > 0) {
+ try {
+ LOG.debug("get preferredRegionToHostMapping; expecting pause here");
+ this.preferredRegionToRegionServerMapping =
+ FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+ } catch (Exception e) {
+ LOG.equals("Got unexpected exception when getting " +
+ "preferredRegionToHostMapping : " + e.toString());
+ // do not pause the master's construction
+ preferredRegionToRegionServerMapping = null;
+ }
+ }
+ // get the start time stamp after scanning the dfs
+ masterStartupTime = System.currentTimeMillis();
}
+ public long getApplyPreferredAssignmentPeriod() {
+ return this.applyPreferredAssignmentPeriod;
+ }
+
+ public long getHoldRegionForBestLocalityPeriod() {
+ return this.holdRegionForBestLocalityPeriod;
+ }
+
+ public long getMasterStartupTime() {
+ return this.masterStartupTime;
+ }
+
+ public Map<String, String> getPreferredRegionToRegionServerMapping() {
+ return preferredRegionToRegionServerMapping;
+ }
/**
* Returns true if this master process was responsible for starting the
* cluster.
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 02:21:23 2011
@@ -149,6 +149,13 @@ public class RegionManager {
private final int zooKeeperNumRetries;
private final int zooKeeperPause;
+ /**
+ * Set of region servers which send heart beat in the first period of time
+ * during the master boots. Hold the best locality regions for these
+ * region servers.
+ */
+ private Set<String> quickStartRegionServerSet = new HashSet<String>();
+
RegionManager(HMaster master) throws IOException {
Configuration conf = master.getConfiguration();
@@ -210,8 +217,8 @@ public class RegionManager {
}
/*
- * Assigns regions to region servers attempting to balance the load across
- * all region servers. Note that no synchronization is necessary as the caller
+ * Assigns regions to region servers attempting to balance the load across all
+ * region servers. Note that no synchronization is necessary as the caller
* (ServerManager.processMsgs) already owns the monitor for the RegionManager.
*
* @param info
@@ -220,25 +227,49 @@ public class RegionManager {
*/
void assignRegions(HServerInfo info, HRegionInfo[] mostLoadedRegions,
ArrayList<HMsg> returnMsgs) {
+ // the region may assigned to this region server
+ Set<RegionState> regionsToAssign = null;
+
HServerLoad thisServersLoad = info.getLoad();
boolean isSingleServer = this.master.numServers() == 1;
+ // have to add . at the end of host name
+ String hostName = info.getHostname();
+
+ long masterRunningTime = System.currentTimeMillis()
+ - this.master.getMasterStartupTime();
+ boolean assignmentByLocality = ((masterRunningTime < this.master
+ .getApplyPreferredAssignmentPeriod()) &&
+ this.master.isClusterStartup() &&
+ this.master.getPreferredRegionToRegionServerMapping() != null) ?
+ true : false;
+
+ boolean holdRegionForBestRegionServer =
+ (masterRunningTime < this.master.getHoldRegionForBestLocalityPeriod());
+
+ if (assignmentByLocality) {
+ quickStartRegionServerSet.add(hostName);
+ }
+ // get the region set to be assigned to this region server
+ regionsToAssign = regionsAwaitingAssignment(info.getServerAddress(),
+ isSingleServer, assignmentByLocality, holdRegionForBestRegionServer,
+ quickStartRegionServerSet);
- // figure out what regions need to be assigned and aren't currently being
- // worked on elsewhere.
- Set<RegionState> regionsToAssign =
- regionsAwaitingAssignment(info.getServerAddress(), isSingleServer);
if (regionsToAssign.size() == 0) {
// There are no regions waiting to be assigned.
- this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
+ if (!assignmentByLocality) {
+ // load balance as before
+ this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
+ }
} else {
- // if there's only one server, just give it all the regions
- if (isSingleServer) {
+ // if there's only one server or assign the region by locality,
+ // just give the regions to this server
+ if (isSingleServer || assignmentByLocality) {
assignRegionsToOneServer(regionsToAssign, info, returnMsgs);
} else {
// otherwise, give this server a few regions taking into account the
- // load of all the other servers.
- assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
- info, returnMsgs);
+ // load of all the other servers
+ assignRegionsToMultipleServers(thisServersLoad, regionsToAssign, info,
+ returnMsgs);
}
}
}
@@ -250,15 +281,15 @@ public class RegionManager {
* regionsInTransition because this method is only called by assignRegions
* whose caller owns the monitor for RegionManager
*
- * TODO: This code is unintelligible. REWRITE. Add TESTS! St.Ack 09/30/2009
+ * TODO: This code is unintelligible. REWRITE. Add TESTS! St.Ack 09/30/2009
* @param thisServersLoad
* @param regionsToAssign
* @param info
* @param returnMsgs
*/
private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
- final Set<RegionState> regionsToAssign, final HServerInfo info,
- final ArrayList<HMsg> returnMsgs) {
+ final Set<RegionState> regionsToAssign, final HServerInfo info,
+ final ArrayList<HMsg> returnMsgs) {
boolean isMetaAssign = false;
for (RegionState s : regionsToAssign) {
if (s.getRegionInfo().isMetaRegion())
@@ -269,9 +300,9 @@ public class RegionManager {
regionsToGiveOtherServers(nRegionsToAssign, thisServersLoad);
nRegionsToAssign -= otherServersRegionsCount;
if (nRegionsToAssign > 0 || isMetaAssign) {
- LOG.debug("Assigning for " + info + ": total nregions to assign=" +
- nRegionsToAssign + ", regions to give other servers than this=" +
- otherServersRegionsCount + ", isMetaAssign=" + isMetaAssign);
+ LOG.debug("Assigning for " + info + ": total nregions to assign="
+ + nRegionsToAssign + ", regions to give other servers than this="
+ + otherServersRegionsCount + ", isMetaAssign=" + isMetaAssign);
// See how many we can assign before this server becomes more heavily
// loaded than the next most heavily loaded server.
@@ -320,7 +351,7 @@ public class RegionManager {
if (count > this.maxAssignInOneGo) {
count = this.maxAssignInOneGo;
}
- for (RegionState s: regionsToAssign) {
+ for (RegionState s : regionsToAssign) {
doRegionAssignment(s, info, returnMsgs);
if (--count <= 0) {
break;
@@ -330,7 +361,6 @@ public class RegionManager {
/*
* Assign all to the only server. An unlikely case but still possible.
- *
* Note that no synchronization is needed on regionsInTransition while
* iterating on it because the only caller is assignRegions whose caller owns
* the monitor for RegionManager
@@ -341,7 +371,7 @@ public class RegionManager {
*/
private void assignRegionsToOneServer(final Set<RegionState> regionsToAssign,
final HServerInfo info, final ArrayList<HMsg> returnMsgs) {
- for (RegionState s: regionsToAssign) {
+ for (RegionState s : regionsToAssign) {
doRegionAssignment(s, info, returnMsgs);
}
}
@@ -360,13 +390,16 @@ public class RegionManager {
synchronized (this.regionsInTransition) {
byte[] data = null;
try {
- data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
+ data = Writables.getBytes(new RegionTransitionEventData(
+ HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
} catch (IOException e) {
- LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
+ LOG.error("Error creating event data for "
+ + HBaseEventType.M2ZK_REGION_OFFLINE, e);
}
- zkWrapper.createOrUpdateUnassignedRegion(
- rs.getRegionInfo().getEncodedName(), data);
- LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+ zkWrapper.createOrUpdateUnassignedRegion(rs.getRegionInfo()
+ .getEncodedName(), data);
+ LOG.debug("Created UNASSIGNED zNode " + regionName + " in state "
+ + HBaseEventType.M2ZK_REGION_OFFLINE);
this.regionsInTransition.put(regionName, rs);
}
@@ -380,16 +413,16 @@ public class RegionManager {
* more lightly loaded servers
*/
private int regionsToGiveOtherServers(final int numUnassignedRegions,
- final HServerLoad thisServersLoad) {
+ final HServerLoad thisServersLoad) {
SortedMap<HServerLoad, Set<String>> lightServers =
new TreeMap<HServerLoad, Set<String>>();
this.master.getLightServers(thisServersLoad, lightServers);
// Examine the list of servers that are more lightly loaded than this one.
// Pretend that we will assign regions to these more lightly loaded servers
- // until they reach load equal with ours. Then, see how many regions are left
- // unassigned. That is how many regions we should assign to this server.
+ // until they reach load equal with ours. Then, see how many regions are
+ // left unassigned. That is how many regions we should assign to this server
int nRegions = 0;
- for (Map.Entry<HServerLoad, Set<String>> e: lightServers.entrySet()) {
+ for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
HServerLoad lightLoad = new HServerLoad(e.getKey());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
@@ -412,15 +445,22 @@ public class RegionManager {
* the monitor for RegionManager
*/
private Set<RegionState> regionsAwaitingAssignment(HServerAddress addr,
- boolean isSingleServer) {
+ boolean isSingleServer, boolean assignmentByLocality,
+ boolean holdRegionForBestRegionserver,
+ Set<String> quickStartRegionServerSet) {
+
// set of regions we want to assign to this server
Set<RegionState> regionsToAssign = new HashSet<RegionState>();
boolean isMetaServer = isMetaServer(addr);
+ boolean isRootServer = isRootServer(addr);
+ boolean isMetaOrRoot = isMetaServer || isRootServer;
+ String hostName = addr.getHostname();
RegionState rootState = null;
// Handle if root is unassigned... only assign root if root is offline.
synchronized (this.regionsInTransition) {
- rootState = regionsInTransition.get(HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString());
+ rootState = regionsInTransition.get(HRegionInfo.ROOT_REGIONINFO
+ .getRegionNameAsString());
}
if (rootState != null && rootState.isUnassigned()) {
// make sure root isnt assigned here first.
@@ -436,29 +476,47 @@ public class RegionManager {
// Look over the set of regions that aren't currently assigned to
// determine which we should assign to this server.
- boolean reassigningMetas = numberOfMetaRegions.get() != onlineMetaRegions.size();
- boolean isMetaOrRoot = isMetaServer || isRootServer(addr);
+ boolean reassigningMetas = numberOfMetaRegions.get() != onlineMetaRegions
+ .size();
if (reassigningMetas && isMetaOrRoot && !isSingleServer) {
return regionsToAssign; // dont assign anything to this server.
}
+
synchronized (this.regionsInTransition) {
- for (RegionState s: regionsInTransition.values()) {
+ for (RegionState s : regionsInTransition.values()) {
+ String regionName = s.getRegionInfo().getEncodedName();
+ String tableName = s.getRegionInfo().getTableDesc().getNameAsString();
+ String name = tableName + ":" + regionName;
HRegionInfo i = s.getRegionInfo();
if (i == null) {
continue;
}
- if (reassigningMetas &&
- !i.isMetaRegion()) {
+ if (reassigningMetas && !i.isMetaRegion()) {
// Can't assign user regions until all meta regions have been assigned
// and are on-line
continue;
}
- if (!i.isMetaRegion() &&
- !master.getServerManager().canAssignUserRegions()) {
- LOG.debug("user region " + i.getRegionNameAsString() +
- " is in transition but not enough servers yet");
+ if (!i.isMetaRegion()
+ && !master.getServerManager().canAssignUserRegions()) {
+ LOG.debug("user region " + i.getRegionNameAsString()
+ + " is in transition but not enough servers yet");
continue;
}
+
+ if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
+ String preferredHost =
+ this.master.getPreferredRegionToRegionServerMapping().get(name);
+
+ if (preferredHost != null && hostName.startsWith(preferredHost)) {
+ LOG.debug("Doing Preferred Region Assignment for : " + name +
+ " to the " + hostName);
+ } else if (holdRegionForBestRegionserver ||
+ quickStartRegionServerSet.contains(preferredHost)) {
+ LOG.debug("Hold the region : " + name +
+ " for its best locality region server " + hostName);
+ continue;
+ }
+ }
if (s.isUnassigned()) {
regionsToAssign.add(s);
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11 02:21:23 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -44,10 +45,16 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
+import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Utility methods for interacting with the underlying file system.
@@ -678,4 +685,103 @@ public class FSUtils {
LOG.info("Finished lease recover attempt for " + p);
}
+ /**
+ * This function is to scan the root path of the file system to get the
+ * mapping between the region name and its best locality region server
+ *
+ * @param fs
+ * @param rootPath
+ * @param out
+ * @return
+ * @throws IOException
+ */
+ public static Map<String, String> getRegionLocalityMappingFromFS(
+ final FileSystem fs, final Path rootPath)
+ throws IOException {
+ // region name to its best locality region server mapping
+ Map<String, String> regionToBestLocalityRSMapping =
+ new HashMap<String, String>();
+ // keep the most block count mapping
+ HashMap<String, AtomicInteger> blockCountMap =
+ new HashMap<String, AtomicInteger>();
+
+ long startTime = System.currentTimeMillis();
+ Path queryPath = new Path(rootPath.toString() + "/*/*/");
+ FileStatus[] statusList = fs.globStatus(queryPath);
+ LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
+ statusList.length);
+
+ for (FileStatus regionStatus : statusList) {
+ if(!regionStatus.isDir()) {
+ continue;
+ }
+
+ // get the region name; it may get some noise data
+ Path regionPath = regionStatus.getPath();
+ String regionName = regionPath.getName();
+ if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
+ continue;
+ }
+ //get table name
+ String tableName = regionPath.getParent().getName();
+
+ int totalBlkCount = 0;
+ blockCountMap.clear();
+
+ // for each cf, get all the blocks information
+ FileStatus[] cfList = fs.listStatus(regionPath);
+ for (FileStatus cfStatus : cfList) {
+ if (!cfStatus.isDir()) {
+ // skip because this is not a CF directory
+ continue;
+ }
+ FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
+ for (FileStatus storeFile : storeFileLists) {
+ BlockLocation[] blkLocations =
+ fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
+ totalBlkCount += blkLocations.length;
+ for(BlockLocation blk: blkLocations) {
+ for (String host: blk.getHosts()) {
+ AtomicInteger count = blockCountMap.get(host);
+ if (count == null) {
+ count = new AtomicInteger(0);
+ blockCountMap.put(host, count);
+ }
+ count.incrementAndGet();
+ }
+ }
+ }
+ }
+
+ int largestBlkCount = 0;
+ String hostToRun = null;
+ for (String host: blockCountMap.keySet()) {
+ int tmp = blockCountMap.get(host).get();
+ if (tmp > largestBlkCount) {
+ largestBlkCount = tmp;
+ hostToRun = host;
+ }
+ }
+
+ if (hostToRun.endsWith(".")) {
+ hostToRun = hostToRun.substring(0, hostToRun.length()-1);
+ }
+ String name = tableName + ":" + regionName;
+ regionToBestLocalityRSMapping.put(name,hostToRun);
+ LOG.debug("[ Best Locality Mapping ] Name: " + name+
+ " Region Server: " + hostToRun) ;
+
+ float rate = largestBlkCount / (float)totalBlkCount * 100;
+ String msg = "Host : " + hostToRun + " has "+largestBlkCount+" / "+
+ totalBlkCount+" ("+rate +"%) blocks for the < "+ name + " >";
+ LOG.info(msg);
+
+ }
+ long overhead = System.currentTimeMillis() - startTime;
+ String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
+
+ LOG.info(overheadMsg);
+ return regionToBestLocalityRSMapping;
+ }
+
}