You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:43:20 UTC

svn commit: r1181943 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: client/HBaseLocalityCheck.java master/HMaster.java util/FSUtils.java

Author: nspiegelberg
Date: Tue Oct 11 17:43:20 2011
New Revision: 1181943

URL: http://svn.apache.org/viewvc?rev=1181943&view=rev
Log:
Fixing a parallel locality check issue

Summary:
Solved the block location map clearing issue, added some extra useful logging
on errors and implemented option passing to the locality checker to override
options on the fly. Also circumvented some more possible NPEs.

Also noticed that despite reporting that we are going through a number of
regions (Regions scanned), we actually have roughly twice as many reported files
we are accessing, so the reporting might be a bit misleading...

Test Plan:
I tested it on DL and everything worked pretty much as expected for the number
of threads used:

1)   202323 ms
2)   92715 ms
3)   70758 ms
4)   51976 ms
5)   45343 ms

Reviewed By: kannan
Reviewers: kannan, liyintang
Commenters: liyintang
CC: hbase@lists, achao, kannan, liyintang, bogdan
Blame Revision: D283312
Differential Revision: 308045

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1181943&r1=1181942&r2=1181943&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java Tue Oct 11 17:43:20 2011
@@ -7,30 +7,50 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.HBaseFsck.HbckInfo;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 public class HBaseLocalityCheck {
+  private static final Log LOG = LogFactory.getLog(HBaseLocalityCheck.class
+      .getName());
+
   private final FileSystem fs;
   private final Path rootdir;
   private Map<String, String> preferredRegionToRegionServerMapping = null;
   private Configuration conf;
-  private static final Log LOG =
-    LogFactory.getLog(HBaseLocalityCheck.class.getName());
+  /**
+   * The table we want to get locality for, or null in case we wanted a check
+   * over all
+   */
+  private final String tableName;
 
-  public HBaseLocalityCheck(Configuration conf) throws IOException {
+  /**
+   * Default constructor
+   *
+   * @param conf
+   *          the configuration object to use
+   * @param tableName
+   *          the tableName we wish to get locality check over, or null if all
+   * @throws IOException
+   *           in case of file system issues
+   */
+  public HBaseLocalityCheck(Configuration conf, final String tableName) throws IOException {
     this.conf = conf;
     this.rootdir = FSUtils.getRootDir(conf);
     this.fs = FileSystem.get(conf);
+    this.tableName = tableName;
   }
 
   /**
@@ -56,8 +76,8 @@ public class HBaseLocalityCheck {
     // Get the locality info for each region by scanning the file system
     preferredRegionToRegionServerMapping = FSUtils
         .getRegionLocalityMappingFromFS(fs, rootdir,
-            conf.getInt("hbase.client.localityCheck.threadPoolSize", 2),
-            conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 60 * 1000));
+            conf.getInt("hbase.client.localityCheck.threadPoolSize", 2), conf,
+            tableName);
 
     Map<String, AtomicInteger> tableToRegionCountMap =
       new HashMap<String, AtomicInteger>();
@@ -121,10 +141,51 @@ public class HBaseLocalityCheck {
       InterruptedException {
     long startTime = System.currentTimeMillis();
     Configuration conf = HBaseConfiguration.create();
-    HBaseLocalityCheck localck = new HBaseLocalityCheck(conf);
+    String tableName = null;
+    Options opt = new Options();
+    opt.addOption("D", true, "Override HBase Configuration Settings");
+    opt.addOption("table", true,
+        "Specify one precise table to scan for locality");
+    try {
+      CommandLine cmd = new GnuParser().parse(opt, args);
+
+      if (cmd.hasOption("D")) {
+        for (String confOpt : cmd.getOptionValues("D")) {
+          String[] kv = confOpt.split("=", 2);
+          if (kv.length == 2) {
+            conf.set(kv[0], kv[1]);
+            LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
+          } else {
+            throw new ParseException("-D option format invalid: " + confOpt);
+          }
+        }
+      }
+
+      if (cmd.hasOption("table")) {
+        tableName = cmd.getOptionValue("table");
+      }
+    } catch (ParseException e) {
+      LOG.error("Could not parse", e);
+      printUsageAndExit();
+    }
+
+    HBaseLocalityCheck localck = new HBaseLocalityCheck(conf, tableName);
     localck.showTableLocality();
     LOG.info("Locality Summary takes " +
         (System.currentTimeMillis() - startTime) + " ms to run" );
     Runtime.getRuntime().exit(0);
   }
+
+  private static void printUsageAndExit() {
+    printUsageAndExit(null);
+  }
+
+  private static void printUsageAndExit(final String message) {
+    if (message != null) {
+      System.err.println(message);
+    }
+    System.err
+        .println("Usage: hbase localityck [-D <conf.param=value>]* [-table <tableName>]");
+    System.exit(0);
+  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181943&r1=1181942&r2=1181943&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 17:43:20 2011
@@ -638,7 +638,7 @@ public class HMaster extends Thread impl
         this.preferredRegionToRegionServerMapping = FSUtils
             .getRegionLocalityMappingFromFS(fs, rootdir,
                 conf.getInt("hbase.master.localityCheck.threadPoolSize", 5),
-                threadWakeFrequency);
+                conf);
       } catch (Exception e) {
         LOG.error("Got unexpected exception when getting " +
             "preferredRegionToHostMapping : " + e.toString());

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181943&r1=1181942&r2=1181943&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11 17:43:20 2011
@@ -709,23 +709,87 @@ public class FSUtils {
    *          the root path to start from
    * @param threadPoolSize
    *          the thread pool size to use
-   * @param threadWakeFrequency
-   *          the wake frequency to perform stat printing
+   * @param conf
+   *          the configuration to use
    * @return the mapping to consider as best possible assignment
    * @throws IOException
    *           in case of file system errors or interrupts
    */
   public static Map<String, String> getRegionLocalityMappingFromFS(
       final FileSystem fs, final Path rootPath, int threadPoolSize,
-      final int threadWakeFrequency)
+      final Configuration conf) throws IOException {
+    return getRegionLocalityMappingFromFS(fs, rootPath, threadPoolSize, conf,
+        null);
+  }
+
+  /**
+   * This function is to scan the root path of the file system to get the
+   * mapping between the region name and its best locality region server
+   *
+   * @param fs
+   *          the file system to use
+   * @param rootPath
+   *          the root path to start from
+   * @param threadPoolSize
+   *          the thread pool size to use
+   * @param conf
+   *          the configuration to use
+   * @param desiredTable
+   *          the table you wish to scan locality for
+   * @return the mapping to consider as best possible assignment
+   * @throws IOException
+   *           in case of file system errors or interrupts
+   */
+  public static Map<String, String> getRegionLocalityMappingFromFS(
+      final FileSystem fs, final Path rootPath, int threadPoolSize,
+      final Configuration conf, final String desiredTable)
       throws IOException {
     // region name to its best locality region server mapping
     Map<String, String> regionToBestLocalityRSMapping =
        new ConcurrentHashMap<String,  String>();
 
     long startTime = System.currentTimeMillis();
-    Path queryPath = new Path(rootPath.toString() + "/*/*/");
-    FileStatus[] statusList = fs.globStatus(queryPath);
+    Path queryPath;
+    if (null == desiredTable) {
+      queryPath = new Path(rootPath.toString() + "/*/*/");
+    } else {
+      queryPath = new Path(rootPath.toString() + "/" + desiredTable + "/*/");
+    }
+
+    // reject all paths that are not appropriate
+    PathFilter pathFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        // this is the region name; it may get some noise data
+        if (null == path) {
+          return false;
+        }
+
+        // no parent?
+        Path parent = path.getParent();
+        if (null == parent) {
+          return false;
+        }
+
+        // not part of a table?
+        if (parent.getName().startsWith(".")
+            && !parent.getName().equals(".META.")) {
+          return false;
+        }
+
+        String regionName = path.getName();
+        if (null == regionName) {
+          return false;
+        }
+
+        if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
+          return false;
+        }
+        return true;
+      }
+    };
+
+    FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
 
     LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
         statusList.length);
@@ -768,7 +832,6 @@ public class FSUtils {
       }
 
       // ignore all file status items that are not of interest
-      int current = 0;
       for (FileStatus regionStatus : statusList) {
         if (null == regionStatus) {
           continue;
@@ -778,10 +841,8 @@ public class FSUtils {
           continue;
         }
 
-        // get the region name; it may get some noise data
         Path regionPath = regionStatus.getPath();
-        String regionName = regionPath.getName();
-        if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
+        if (null == regionPath) {
           continue;
         }
 
@@ -789,9 +850,8 @@ public class FSUtils {
         // threads get things to do; can create empty buckets in the rare case
         // in which we end up getting less region paths than we have threads to
         // handle them
-        buckets.get(current % threadPoolSize).add(regionPath);
+        buckets.get(totalGoodRegionFiles % threadPoolSize).add(regionPath);
         ++totalGoodRegionFiles;
-        ++current;
       }
 
       // start each thread in executor service
@@ -801,6 +861,8 @@ public class FSUtils {
     } finally {
       if (null != tpe && null != parallelTasks) {
         tpe.shutdown();
+        int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+            60 * 1000);
         try {
           // here we wait until TPE terminates, which is either naturally or by
           // exceptions in the execution of the threads
@@ -813,7 +875,7 @@ public class FSUtils {
 
             // printing out rough estimate, so as to not introduce
             // AtomicInteger
-            LOG.info("Locality checking is underway: { THREADS : "
+            LOG.info("Locality checking is underway: { Threads completed : "
                 + tpe.getCompletedTaskCount() + "/" + parallelTasks.length
                 + " , Scanned Regions : " + filesDone + "/"
                 + totalGoodRegionFiles + " }");
@@ -837,6 +899,8 @@ public class FSUtils {
  * Thread to be used for
  */
 class FSRegionScanner implements Runnable {
+  static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
+
   /**
    * The shared block count map
    */
@@ -888,11 +952,11 @@ class FSRegionScanner implements Runnabl
     try {
       for (Path regionPath : paths) {
         try {
+          // empty the map for each region
+          blockCountMap.clear();
 
-          // break here in case this for loop selects some of the null-pading
-          // elements at the end of the receiving arrays
           if (null == regionPath) {
-            break;
+            continue;
           }
           //get table name
           String tableName = regionPath.getParent().getName();
@@ -959,8 +1023,10 @@ class FSRegionScanner implements Runnabl
 
           this.numerOfFinishedRegions++;
         } catch (IOException e) {
+          LOG.warn("Problem scanning file system", e);
           continue;
         } catch (RuntimeException e) {
+          LOG.warn("Problem scanning file system", e);
           continue;
         }
       }
@@ -969,6 +1035,4 @@ class FSRegionScanner implements Runnabl
       assert this.numerOfFinishedRegions <= this.paths.size();
     }
   }
-  // we can use the error mechanism to stop the whole process if we want,
-  // instead of continuing on errors...
 }