You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:24 UTC

svn commit: r1181570 - in /hbase/branches/0.89: bin/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/util/

Author: nspiegelberg
Date: Tue Oct 11 02:21:23 2011
New Revision: 1181570

URL: http://svn.apache.org/viewvc?rev=1181570&view=rev
Log:
Assign regions based on block locality and print out the region locality

Summary:
reate a command line tools to print out the region locality info
Example: running this command line in my dev cluster: 210
====================
 ./bin/hbase localityck

Host :230. has 13 / 13 blocks for the test1 :
2d59bb40ffba29ea04fe11b27d997646
Host :250. has 12 / 12 blocks for the test1 :
c60bd76550b9c6d5348740f4a41fc9bf
Host :230. has 12 / 12 blocks for the test1 :
6fb75bb06213d19171abf816791b2423
Host :230. has 4 / 4 blocks for the -ROOT- : 70236052
Host :230. has 12 / 12 blocks for the test1 :
01c048a618024b38a5f0192614a9660a
Host :230. has 12 / 12 blocks for the test1 :
54dd7d26a69c9474f98f54cbdc35cb98
Host :290. has 4 / 4 blocks for the .META. :
1028785192
Host :230. has 12 / 12 blocks for the test1 :
36dbc39af1465227cb6118fb17da4a20
Host :230. has 12 / 12 blocks for the test1 :
4d3958e8ad30bcbe39c037ceb94f3c10
Host :230. has 12 / 12 blocks for the test1 :
ced427fa96da9eb3cd9c351ca074a208
Host :230. has 3 / 3 blocks for the test2 :
a638e6d34311e2995154679dea73bb8c
Host :230. has 12 / 12 blocks for the test1 :
5035d560a22b2a65389c4b6acde9cbf8
Host :230. has 12 / 12 blocks for the test1 :
bd55c6af492a684adc573380267bf188
Host :250. has 12 / 12 blocks for the test1 :
59534b14cff211e712151be628445aec
Host :250. has 12 / 12 blocks for the test1 :
9ec644b196af2d551b3decfc7409ef9b
Host :230. has 12 / 12 blocks for the test1 :
0e9416af3be05fb3844d4cac0c4b5198
Host :250. has 12 / 12 blocks for the test1 :
59865cdd283215cc2250423d2584d034
Host :250. has 12 / 12 blocks for the test1 :
e511fb3efa475866d7b27ac17aea81e4

Test Plan:
It has been tested on dev cluster. I will continue working on more test cases
to stablize the code.

Reviewed By: kannan
Reviewers: kannan, kranganathan, gqchen, nspiegelberg, bogdan
Commenters: dhruba, mbautin
CC: , hbase@lists, kannan, dhruba, liyintang, mbautin, achao
Differential Revision: 257279

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
Modified:
    hbase/branches/0.89/bin/hbase
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified: hbase/branches/0.89/bin/hbase
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/bin/hbase?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/bin/hbase (original)
+++ hbase/branches/0.89/bin/hbase Tue Oct 11 02:21:23 2011
@@ -76,6 +76,7 @@ if [ $# = 0 ]; then
   echo "  avro             run an HBase Avro server"
   echo "  migrate          upgrade an hbase.rootdir"
   echo "  hbck             run the hbase 'fsck' tool"
+  echo "  localityck	     run the hbase locality check tool"
   echo "  verify [-help]   verify that the cluster is working properly"
   echo " or"
   echo "  CLASSNAME        run the class named CLASSNAME"
@@ -256,6 +257,8 @@ elif [ "$COMMAND" = "migrate" ] ; then
   CLASS='org.apache.hadoop.hbase.util.Migrate'
 elif [ "$COMMAND" = "hbck" ] ; then
   CLASS='org.apache.hadoop.hbase.client.HBaseFsck'
+elif [ "$COMMAND" = "localityck" ] ; then
+  CLASS='org.apache.hadoop.hbase.client.HBaseLocalityCheck'
 elif [ "$COMMAND" = "verify" ] ; then
   CLASS='org.apache.hadoop.hbase.loadtest.LoadTester'
 elif [ "$COMMAND" = "zookeeper" ] ; then

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java Tue Oct 11 02:21:23 2011
@@ -120,13 +120,11 @@ public class HBaseFsck {
           new LinkedBlockingQueue<Runnable>());
   }
 
-  /**
-   * Contacts the master and prints out cluster-wide information
-   * @throws IOException if a remote or network exception occurs
-   * @return 0 on success, non-zero on failure
-   */
-  int doWork() throws IOException, InterruptedException {
+  public TreeMap<String, HbckInfo> getRegionInfo() {
+    return this.regionInfo;
+  }
 
+  public int initAndScanRootMeta() throws IOException {
     // print hbase server version
     errors.print("Version: " + status.getHBaseVersion());
     LOG.debug("timelag = " + StringUtils.formatTime(this.timelag));
@@ -150,6 +148,19 @@ public class HBaseFsck {
       errors.reportError("Encountered fatal error. Exitting...");
       return -1;
     }
+    return 0;
+  }
+
+  /**
+   * Contacts the master and prints out cluster-wide information
+   * @throws IOException if a remote or network exception occurs
+   * @return 0 on success, non-zero on failure
+   */
+  int doWork() throws IOException, InterruptedException {
+
+    if (initAndScanRootMeta() == -1) {
+      return -1;
+    }
 
     // get a list of all tables that have not changed recently.
     AtomicInteger numSkipped = new AtomicInteger(0);
@@ -267,6 +278,23 @@ public class HBaseFsck {
     return true;
   }
 
+  /**
+   * Contacts each regionserver and fetches metadata about regions.
+   * @throws IOException if a remote or network exception occurs
+   */
+  void scanRegionServers() throws IOException, InterruptedException {
+    Collection<HServerInfo> regionServers = status.getServerInfo();
+    errors.print("Number of live region servers:" +
+        regionServers.size());
+    if (details) {
+      for (HServerInfo rsinfo: regionServers) {
+        errors.detail("\t RegionServer:" + rsinfo.getServerName());
+      }
+    }
+    scanRegionServers(regionServers);
+    // finish all async tasks before analyzing what we have
+    finishAsyncWork();
+  }
 
   /**
    * Contacts each regionserver and fetches metadata about regions.
@@ -274,7 +302,7 @@ public class HBaseFsck {
    * @throws IOException if a remote or network exception occurs
    */
   void scanRegionServers(Collection<HServerInfo> regionServerList)
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
 
     // loop to contact each region server in parallel
     for (HServerInfo rsinfo:regionServerList) {

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1181570&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java Tue Oct 11 02:21:23 2011
@@ -0,0 +1,128 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseFsck.HbckInfo;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+public class HBaseLocalityCheck {
+  private final FileSystem fs;
+  private final Path rootdir;
+  private Map<String, String> preferredRegionToRegionServerMapping = null;
+  private Configuration conf;
+  private static final Log LOG =
+    LogFactory.getLog(HBaseLocalityCheck.class.getName());
+
+  public HBaseLocalityCheck(Configuration conf) throws IOException {
+    this.conf = conf;
+    this.rootdir = FSUtils.getRootDir(conf);
+    this.fs = FileSystem.get(conf);
+  }
+
+  /**
+   * Show the locality information for each table. It will show how many regions
+   * in each table and how many of them are running the best locality region
+   * server
+   *
+   * @throws MasterNotRunningException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void showTableLocality()
+      throws MasterNotRunningException, IOException, InterruptedException {
+    // create a fsck object
+    HBaseFsck fsck = new HBaseFsck(conf);
+    fsck.initAndScanRootMeta();
+    fsck.scanRegionServers();
+    TreeMap<String, HbckInfo> regionInfo = fsck.getRegionInfo();
+
+    boolean localityMatch = false;
+    LOG.info("Locality information by region");
+
+    // Get the locality info for each region by scanning the file system
+    preferredRegionToRegionServerMapping =
+      FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+
+    Map<String, AtomicInteger> tableToRegionCountMap =
+      new HashMap<String, AtomicInteger>();
+    Map<String, AtomicInteger> tableToRegionsWithLocalityMap =
+      new HashMap<String, AtomicInteger>();
+
+    for (Map.Entry<String, String> entry :
+        preferredRegionToRegionServerMapping.entrySet()) {
+      // get region name and table
+      String name = entry.getKey();
+      int spliterIndex =name.lastIndexOf(":");
+      String regionName = name.substring(spliterIndex+1);
+      String tableName = name.substring(0, spliterIndex);
+
+      //get region server hostname
+      String bestHostName = entry.getValue();
+      localityMatch = false;
+      HbckInfo region = regionInfo.get(regionName);
+      if (region != null && region.deployedOn != null &&
+            region.deployedOn.size() != 0) {
+        String realHostName = null;
+        List<HServerAddress> serverList = region.deployedOn;
+        if (!tableToRegionCountMap.containsKey(tableName)){
+          tableToRegionCountMap.put(tableName, new AtomicInteger(1));
+          tableToRegionsWithLocalityMap.put(tableName, new AtomicInteger(0));
+        } else {
+          tableToRegionCountMap.get(tableName).incrementAndGet();
+        }
+
+        realHostName = serverList.get(0).getHostname();
+        if (realHostName.equalsIgnoreCase(bestHostName)) {
+          localityMatch = true;
+          tableToRegionsWithLocalityMap.get(tableName).incrementAndGet();
+        }
+
+        LOG.info("<table:region> : <" + name + "> is running on host: "
+            + realHostName + " \n and the prefered host is " + bestHostName +
+            " [" + (localityMatch ? "Matched]" : "NOT matched]"));
+
+      } else {
+        LOG.info("<table:region> : <" + name + "> no info obtained for this" +
+			" region from any of the region servers.");
+        continue;
+      }
+    }
+
+    LOG.info("======== Locality Summary ===============");
+    for(String tableName : tableToRegionCountMap.keySet()) {
+      int totalRegions = tableToRegionCountMap.get(tableName).get();
+      int totalRegionsWithLocality =
+        tableToRegionsWithLocalityMap.get(tableName).get();
+
+      float rate = (totalRegionsWithLocality / (float) totalRegions) * 100;
+      LOG.info("For Table: "+tableName+" ; #Total Regions: " + totalRegions +
+          " ;" + " # Local Regions " + totalRegionsWithLocality + " rate = "
+          + rate + " %");
+    }
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+    long startTime = System.currentTimeMillis();
+    Configuration conf = HBaseConfiguration.create();
+    HBaseLocalityCheck localck = new HBaseLocalityCheck(conf);
+    localck.showTableLocality();
+    LOG.info("Locality Summary takes " +
+        (System.currentTimeMillis() - startTime) + " ms to run" );
+    Runtime.getRuntime().exit(0);
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 02:21:23 2011
@@ -168,6 +168,11 @@ public class HMaster extends Thread impl
   // True if this is the master that started the cluster.
   boolean isClusterStartup;
 
+  private long masterStartupTime;
+  private Map<String, String> preferredRegionToRegionServerMapping;
+  private long applyPreferredAssignmentPeriod = 0l;
+  private long holdRegionForBestLocalityPeriod = 0l;
+
   /**
    * Constructor
    * @param conf configuration
@@ -266,7 +271,6 @@ public class HMaster extends Thread impl
     // start the "open region" executor service
     HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString());
 
-
     // start the region manager
     regionManager = new RegionManager(this);
 
@@ -275,8 +279,50 @@ public class HMaster extends Thread impl
     // We're almost open for business
     this.closed.set(false);
     LOG.info("HMaster w/ hbck initialized on " + this.address.toString());
+
+    // assign the regions based on the region locality in this period of time
+    this.applyPreferredAssignmentPeriod =
+      conf.getLong("hbase.master.applyPreferredAssignment.period", 5 * 60 * 1000);
+
+    // if a region's best region server hasn't checked in for this much time
+    // since master startup, then the master is free to assign this region
+    // out to any region server
+    this.holdRegionForBestLocalityPeriod =
+      conf.getLong("hbase.master.holdRegionForBestLocality.period",
+          1 * 60 * 1000);
+
+    // disable scanning dfs by setting applyPreferredAssignmentPeriod to 0
+    if (isClusterStartup && applyPreferredAssignmentPeriod > 0) {
+      try {
+        LOG.debug("get preferredRegionToHostMapping; expecting pause here");
+        this.preferredRegionToRegionServerMapping =
+          FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+      } catch (Exception e) {
+        LOG.equals("Got unexpected exception when getting " +
+			"preferredRegionToHostMapping : " + e.toString());
+        // do not pause the master's construction
+        preferredRegionToRegionServerMapping = null;
+      }
+    }
+    // get the start time stamp after scanning the dfs
+    masterStartupTime = System.currentTimeMillis();
   }
 
+  public long getApplyPreferredAssignmentPeriod() {
+    return this.applyPreferredAssignmentPeriod;
+  }
+
+  public long getHoldRegionForBestLocalityPeriod() {
+    return this.holdRegionForBestLocalityPeriod;
+  }
+
+  public long getMasterStartupTime() {
+    return this.masterStartupTime;
+  }
+
+  public Map<String, String> getPreferredRegionToRegionServerMapping() {
+    return preferredRegionToRegionServerMapping;
+  }
   /**
    * Returns true if this master process was responsible for starting the
    * cluster.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 02:21:23 2011
@@ -149,6 +149,13 @@ public class RegionManager {
   private final int zooKeeperNumRetries;
   private final int zooKeeperPause;
 
+  /**
+   * Set of region servers which send heart beat in the first period of time
+   * during the master boots. Hold the best locality regions for these
+   * region servers.
+   */
+  private Set<String> quickStartRegionServerSet = new HashSet<String>();
+
   RegionManager(HMaster master) throws IOException {
     Configuration conf = master.getConfiguration();
 
@@ -210,8 +217,8 @@ public class RegionManager {
   }
 
   /*
-   * Assigns regions to region servers attempting to balance the load across
-   * all region servers. Note that no synchronization is necessary as the caller
+   * Assigns regions to region servers attempting to balance the load across all
+   * region servers. Note that no synchronization is necessary as the caller
    * (ServerManager.processMsgs) already owns the monitor for the RegionManager.
    *
    * @param info
@@ -220,25 +227,49 @@ public class RegionManager {
    */
   void assignRegions(HServerInfo info, HRegionInfo[] mostLoadedRegions,
       ArrayList<HMsg> returnMsgs) {
+    // the region may assigned to this region server
+    Set<RegionState> regionsToAssign = null;
+
     HServerLoad thisServersLoad = info.getLoad();
     boolean isSingleServer = this.master.numServers() == 1;
+    // have to add . at the end of host name
+    String hostName = info.getHostname();
+
+    long masterRunningTime = System.currentTimeMillis()
+        - this.master.getMasterStartupTime();
+    boolean assignmentByLocality = ((masterRunningTime < this.master
+        .getApplyPreferredAssignmentPeriod()) &&
+        this.master.isClusterStartup() &&
+        this.master.getPreferredRegionToRegionServerMapping() != null) ?
+        true : false;
+
+    boolean holdRegionForBestRegionServer =
+      (masterRunningTime < this.master.getHoldRegionForBestLocalityPeriod());
+
+    if (assignmentByLocality) {
+      quickStartRegionServerSet.add(hostName);
+    }
+    // get the region set to be assigned to this region server
+    regionsToAssign = regionsAwaitingAssignment(info.getServerAddress(),
+        isSingleServer, assignmentByLocality, holdRegionForBestRegionServer,
+        quickStartRegionServerSet);
 
-    // figure out what regions need to be assigned and aren't currently being
-    // worked on elsewhere.
-    Set<RegionState> regionsToAssign =
-      regionsAwaitingAssignment(info.getServerAddress(), isSingleServer);
     if (regionsToAssign.size() == 0) {
       // There are no regions waiting to be assigned.
-      this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
+      if (!assignmentByLocality) {
+        // load balance as before
+        this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
+      }
     } else {
-      // if there's only one server, just give it all the regions
-      if (isSingleServer) {
+      // if there's only one server or assign the region by locality,
+      // just give the regions to this server
+      if (isSingleServer || assignmentByLocality) {
         assignRegionsToOneServer(regionsToAssign, info, returnMsgs);
       } else {
         // otherwise, give this server a few regions taking into account the
-        // load of all the other servers.
-        assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
-            info, returnMsgs);
+        // load of all the other servers
+        assignRegionsToMultipleServers(thisServersLoad, regionsToAssign, info,
+            returnMsgs);
       }
     }
   }
@@ -250,15 +281,15 @@ public class RegionManager {
    * regionsInTransition because this method is only called by assignRegions
    * whose caller owns the monitor for RegionManager
    *
-   * TODO: This code is unintelligible.  REWRITE. Add TESTS! St.Ack 09/30/2009
+   * TODO: This code is unintelligible. REWRITE. Add TESTS! St.Ack 09/30/2009
    * @param thisServersLoad
    * @param regionsToAssign
    * @param info
    * @param returnMsgs
    */
   private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
-    final Set<RegionState> regionsToAssign, final HServerInfo info,
-    final ArrayList<HMsg> returnMsgs) {
+      final Set<RegionState> regionsToAssign, final HServerInfo info,
+      final ArrayList<HMsg> returnMsgs) {
     boolean isMetaAssign = false;
     for (RegionState s : regionsToAssign) {
       if (s.getRegionInfo().isMetaRegion())
@@ -269,9 +300,9 @@ public class RegionManager {
       regionsToGiveOtherServers(nRegionsToAssign, thisServersLoad);
     nRegionsToAssign -= otherServersRegionsCount;
     if (nRegionsToAssign > 0 || isMetaAssign) {
-      LOG.debug("Assigning for " + info + ": total nregions to assign=" +
-        nRegionsToAssign + ", regions to give other servers than this=" +
-        otherServersRegionsCount + ", isMetaAssign=" + isMetaAssign);
+      LOG.debug("Assigning for " + info + ": total nregions to assign="
+          + nRegionsToAssign + ", regions to give other servers than this="
+          + otherServersRegionsCount + ", isMetaAssign=" + isMetaAssign);
 
       // See how many we can assign before this server becomes more heavily
       // loaded than the next most heavily loaded server.
@@ -320,7 +351,7 @@ public class RegionManager {
     if (count > this.maxAssignInOneGo) {
       count = this.maxAssignInOneGo;
     }
-    for (RegionState s: regionsToAssign) {
+    for (RegionState s : regionsToAssign) {
       doRegionAssignment(s, info, returnMsgs);
       if (--count <= 0) {
         break;
@@ -330,7 +361,6 @@ public class RegionManager {
 
   /*
    * Assign all to the only server. An unlikely case but still possible.
-   *
    * Note that no synchronization is needed on regionsInTransition while
    * iterating on it because the only caller is assignRegions whose caller owns
    * the monitor for RegionManager
@@ -341,7 +371,7 @@ public class RegionManager {
    */
   private void assignRegionsToOneServer(final Set<RegionState> regionsToAssign,
       final HServerInfo info, final ArrayList<HMsg> returnMsgs) {
-    for (RegionState s: regionsToAssign) {
+    for (RegionState s : regionsToAssign) {
       doRegionAssignment(s, info, returnMsgs);
     }
   }
@@ -360,13 +390,16 @@ public class RegionManager {
     synchronized (this.regionsInTransition) {
       byte[] data = null;
       try {
-        data = Writables.getBytes(new RegionTransitionEventData(HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
+        data = Writables.getBytes(new RegionTransitionEventData(
+            HBaseEventType.M2ZK_REGION_OFFLINE, HMaster.MASTER));
       } catch (IOException e) {
-        LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
+        LOG.error("Error creating event data for "
+            + HBaseEventType.M2ZK_REGION_OFFLINE, e);
       }
-      zkWrapper.createOrUpdateUnassignedRegion(
-          rs.getRegionInfo().getEncodedName(), data);
-      LOG.debug("Created UNASSIGNED zNode " + regionName + " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+      zkWrapper.createOrUpdateUnassignedRegion(rs.getRegionInfo()
+          .getEncodedName(), data);
+      LOG.debug("Created UNASSIGNED zNode " + regionName + " in state "
+          + HBaseEventType.M2ZK_REGION_OFFLINE);
       this.regionsInTransition.put(regionName, rs);
     }
 
@@ -380,16 +413,16 @@ public class RegionManager {
    * more lightly loaded servers
    */
   private int regionsToGiveOtherServers(final int numUnassignedRegions,
-    final HServerLoad thisServersLoad) {
+      final HServerLoad thisServersLoad) {
     SortedMap<HServerLoad, Set<String>> lightServers =
       new TreeMap<HServerLoad, Set<String>>();
     this.master.getLightServers(thisServersLoad, lightServers);
     // Examine the list of servers that are more lightly loaded than this one.
     // Pretend that we will assign regions to these more lightly loaded servers
-    // until they reach load equal with ours. Then, see how many regions are left
-    // unassigned. That is how many regions we should assign to this server.
+    // until they reach load equal with ours. Then, see how many regions are
+    // left unassigned. That is how many regions we should assign to this server
     int nRegions = 0;
-    for (Map.Entry<HServerLoad, Set<String>> e: lightServers.entrySet()) {
+    for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
       HServerLoad lightLoad = new HServerLoad(e.getKey());
       do {
         lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
@@ -412,15 +445,22 @@ public class RegionManager {
    * the monitor for RegionManager
    */
   private Set<RegionState> regionsAwaitingAssignment(HServerAddress addr,
-                                                     boolean isSingleServer) {
+      boolean isSingleServer, boolean assignmentByLocality,
+      boolean holdRegionForBestRegionserver,
+      Set<String> quickStartRegionServerSet) {
+
     // set of regions we want to assign to this server
     Set<RegionState> regionsToAssign = new HashSet<RegionState>();
 
     boolean isMetaServer = isMetaServer(addr);
+    boolean isRootServer = isRootServer(addr);
+    boolean isMetaOrRoot = isMetaServer || isRootServer;
+    String hostName = addr.getHostname();
     RegionState rootState = null;
     // Handle if root is unassigned... only assign root if root is offline.
     synchronized (this.regionsInTransition) {
-      rootState = regionsInTransition.get(HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString());
+      rootState = regionsInTransition.get(HRegionInfo.ROOT_REGIONINFO
+          .getRegionNameAsString());
     }
     if (rootState != null && rootState.isUnassigned()) {
       // make sure root isnt assigned here first.
@@ -436,29 +476,47 @@ public class RegionManager {
 
     // Look over the set of regions that aren't currently assigned to
     // determine which we should assign to this server.
-    boolean reassigningMetas = numberOfMetaRegions.get() != onlineMetaRegions.size();
-    boolean isMetaOrRoot = isMetaServer || isRootServer(addr);
+    boolean reassigningMetas = numberOfMetaRegions.get() != onlineMetaRegions
+        .size();
     if (reassigningMetas && isMetaOrRoot && !isSingleServer) {
       return regionsToAssign; // dont assign anything to this server.
     }
+
     synchronized (this.regionsInTransition) {
-      for (RegionState s: regionsInTransition.values()) {
+      for (RegionState s : regionsInTransition.values()) {
+        String regionName = s.getRegionInfo().getEncodedName();
+        String tableName = s.getRegionInfo().getTableDesc().getNameAsString();
+        String name = tableName + ":" + regionName;
         HRegionInfo i = s.getRegionInfo();
         if (i == null) {
           continue;
         }
-        if (reassigningMetas &&
-            !i.isMetaRegion()) {
+        if (reassigningMetas && !i.isMetaRegion()) {
           // Can't assign user regions until all meta regions have been assigned
           // and are on-line
           continue;
         }
-        if (!i.isMetaRegion() &&
-            !master.getServerManager().canAssignUserRegions()) {
-          LOG.debug("user region " + i.getRegionNameAsString() +
-            " is in transition but not enough servers yet");
+        if (!i.isMetaRegion()
+            && !master.getServerManager().canAssignUserRegions()) {
+          LOG.debug("user region " + i.getRegionNameAsString()
+              + " is in transition but not enough servers yet");
           continue;
         }
+
+        if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
+          String preferredHost =
+            this.master.getPreferredRegionToRegionServerMapping().get(name);
+
+          if (preferredHost != null && hostName.startsWith(preferredHost)) {
+            LOG.debug("Doing Preferred Region Assignment for : " + name +
+                " to the " + hostName);
+          } else if (holdRegionForBestRegionserver ||
+              quickStartRegionServerSet.contains(preferredHost)) {
+            LOG.debug("Hold the region : " + name +
+                " for its best locality region server " + hostName);
+            continue;
+          }
+        }
         if (s.isUnassigned()) {
           regionsToAssign.add(s);
         }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181570&r1=1181569&r2=1181570&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11 02:21:23 2011
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -44,10 +45,16 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.InvocationTargetException;
+import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Utility methods for interacting with the underlying file system.
@@ -678,4 +685,103 @@ public class FSUtils {
     LOG.info("Finished lease recover attempt for " + p);
   }
 
+  /**
+   * This function is to scan the root path of the file system to get the
+   * mapping between the region name and its best locality region server
+   *
+   * @param fs
+   * @param rootPath
+   * @param out
+   * @return
+   * @throws IOException
+   */
+  public static  Map<String, String> getRegionLocalityMappingFromFS(
+      final FileSystem fs,  final Path rootPath)
+      throws IOException {
+    // region name to its best locality region server mapping
+    Map<String, String> regionToBestLocalityRSMapping =
+       new HashMap<String,  String>();
+    // keep the most block count mapping
+    HashMap<String, AtomicInteger> blockCountMap =
+      new HashMap<String, AtomicInteger>();
+
+    long startTime = System.currentTimeMillis();
+    Path queryPath = new Path(rootPath.toString() + "/*/*/");
+    FileStatus[] statusList = fs.globStatus(queryPath);
+    LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
+        statusList.length);
+
+    for (FileStatus regionStatus : statusList) {
+      if(!regionStatus.isDir()) {
+        continue;
+      }
+
+      // get the region name; it may get some noise data
+      Path regionPath = regionStatus.getPath();
+      String regionName = regionPath.getName();
+      if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
+        continue;
+      }
+      //get table name
+      String tableName = regionPath.getParent().getName();
+
+      int totalBlkCount = 0;
+      blockCountMap.clear();
+
+      // for each cf, get all the blocks information
+      FileStatus[] cfList = fs.listStatus(regionPath);
+      for (FileStatus cfStatus : cfList) {
+        if (!cfStatus.isDir()) {
+          // skip because this is not a CF directory
+          continue;
+        }
+        FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
+        for (FileStatus storeFile : storeFileLists) {
+          BlockLocation[] blkLocations =
+            fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
+          totalBlkCount += blkLocations.length;
+          for(BlockLocation blk: blkLocations) {
+            for (String host: blk.getHosts()) {
+              AtomicInteger count = blockCountMap.get(host);
+              if (count == null) {
+                count = new AtomicInteger(0);
+                blockCountMap.put(host, count);
+              }
+             count.incrementAndGet();
+            }
+          }
+        }
+      }
+
+      int largestBlkCount = 0;
+      String hostToRun = null;
+      for (String host: blockCountMap.keySet()) {
+        int tmp = blockCountMap.get(host).get();
+        if (tmp > largestBlkCount) {
+          largestBlkCount = tmp;
+          hostToRun = host;
+        }
+      }
+
+      if (hostToRun.endsWith(".")) {
+        hostToRun = hostToRun.substring(0, hostToRun.length()-1);
+      }
+      String name = tableName + ":" + regionName;
+      regionToBestLocalityRSMapping.put(name,hostToRun);
+      LOG.debug("[ Best Locality Mapping ] Name: " + name+
+          " Region Server: " + hostToRun) ;
+
+      float rate = largestBlkCount / (float)totalBlkCount * 100;
+      String msg = "Host : " + hostToRun + " has "+largestBlkCount+" / "+
+        totalBlkCount+" ("+rate +"%) blocks for the < "+ name + " >";
+      LOG.info(msg);
+
+    }
+    long overhead = System.currentTimeMillis() - startTime;
+    String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
+
+    LOG.info(overheadMsg);
+    return regionToBestLocalityRSMapping;
+  }
+
 }