You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:11:53 UTC

svn commit: r1181476 - /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java

Author: nspiegelberg
Date: Tue Oct 11 02:11:52 2011
New Revision: 1181476

URL: http://svn.apache.org/viewvc?rev=1181476&view=rev
Log:
Make HBCK utility faster

Summary:
Make the HBCK utility contact all region servers in parallel. This will speedup
hbck processing, especially when there are lots of region servers.

Test Plan:
run on server

DiffCamp Revision: 192043
Reviewed By: kannan
Reviewers: nspiegelberg, kannan
CC: dhruba, kannan, hbase@lists
Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java?rev=1181476&r1=1181475&r2=1181476&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java Tue Oct 11 02:11:52 2011
@@ -29,6 +29,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +64,9 @@ import com.google.common.collect.Lists;
  * region server(s) and the state of data in HDFS.
  */
 public class HBaseFsck {
+  private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
   private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
   private Configuration conf;
 
@@ -79,6 +85,9 @@ public class HBaseFsck {
   private boolean rerun = false; // if we tried to fix something rerun hbck
   private static boolean summary = false; // if we want to print less output
   private static boolean promptResponse = false;  // "no" to all prompt questions
+  private int numThreads = MAX_NUM_THREADS;
+
+  ThreadPoolExecutor executor;         // threads to retrieve data from regionservers
 
   /**
    * Constructor
@@ -94,6 +103,11 @@ public class HBaseFsck {
     HBaseAdmin admin = new HBaseAdmin(conf);
     status = admin.getMaster().getClusterStatus();
     connection = admin.getConnection();
+
+    numThreads = conf.getInt("hbasefsck.numthreads", numThreads);
+    executor = new ThreadPoolExecutor(0, numThreads,
+          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>());
   }
 
   /**
@@ -101,7 +115,7 @@ public class HBaseFsck {
    * @throws IOException if a remote or network exception occurs
    * @return 0 on success, non-zero on failure
    */
-  int doWork() throws IOException {
+  int doWork() throws IOException, InterruptedException {
 
     // print hbase server version
     errors.print("Version: " + status.getHBaseVersion());
@@ -187,7 +201,7 @@ public class HBaseFsck {
    * Scan HDFS for all regions, recording their information into
    * regionInfo
    */
-  void checkHdfs() throws IOException {
+  void checkHdfs() throws IOException, InterruptedException {
     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
     FileSystem fs = rootDir.getFileSystem(conf);
 
@@ -208,36 +222,21 @@ public class HBaseFsck {
     if (!foundVersionFile) {
       errors.reportError("Version file does not exist in root dir " + rootDir);
     }
-
+    //
     // level 1:  <HBASE_DIR>/*
+    WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];
+    int num = 0;
     for (FileStatus tableDir : tableDirs) {
-      String tableName = tableDir.getPath().getName();
-      // ignore hidden files
-      if (tableName.startsWith(".") &&
-          !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
-        continue;
-      // level 2: <HBASE_DIR>/<table>/*
-      FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
-      for (FileStatus regionDir : regionDirs) {
-        String encodedName = regionDir.getPath().getName();
-
-        // ignore directories that aren't hexadecimal
-        if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
-
-        HbckInfo hbi = getOrCreateInfo(encodedName);
-        hbi.foundRegionDir = regionDir;
-
-        // Set a flag if this region contains only edits
-        // This is special case if a region is left after split
-        hbi.onlyEdits = true;
-        FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
-        Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
-        for (FileStatus subDir : subDirs) {
-          String sdName = subDir.getPath().getName();
-          if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
-            hbi.onlyEdits = false;
-            break;
-          }
+      dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir);
+      executor.execute(dirs[num]);
+      num++;
+    }
+
+    // wait for all directories to be done
+    for (int i = 0; i < num; i++) {
+      synchronized (dirs[i]) {
+        while (!dirs[i].isDone()) {
+          dirs[i].wait();
         }
       }
     }
@@ -273,38 +272,24 @@ public class HBaseFsck {
    * @throws IOException if a remote or network exception occurs
    */
   void processRegionServers(Collection<HServerInfo> regionServerList)
-    throws IOException {
+    throws IOException, InterruptedException {
 
-    // loop to contact each region server
-    for (HServerInfo rsinfo:regionServerList) {
-      errors.progress();
-      try {
-        HRegionInterface server = connection.getHRegionConnection(
-                                    rsinfo.getServerAddress());
+    WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
+    int num = 0;
 
-        // list all online regions from this region server
-        HRegionInfo[] regions = server.getRegionsAssignment();
-
-        if (details) {
-          errors.detail("\nRegionServer:" + rsinfo.getServerName() +
-                        " number of regions:" + regions.length);
-          for (HRegionInfo rinfo: regions) {
-            errors.detail("\n\t name:" + rinfo.getRegionNameAsString() +
-                          " id:" + rinfo.getRegionId() +
-                          " encoded name:" + rinfo.getEncodedName() +
-                          " start :" + Bytes.toStringBinary(rinfo.getStartKey()) +
-                          " end :" + Bytes.toStringBinary(rinfo.getEndKey()));
-          }
-        }
+    // loop to contact each region server in parallel
+    for (HServerInfo rsinfo:regionServerList) {
+      work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
+      executor.execute(work[num]);
+      num++;
+    }
 
-        // check to see if the existance of this region matches the region in META
-        for (HRegionInfo r:regions) {
-          HbckInfo hbi = getOrCreateInfo(r.getEncodedName());
-          hbi.deployedOn.add(rsinfo.getServerAddress());
+    // wait for all submitted tasks to be done
+    for (int i = 0; i < num; i++) {
+      synchronized (work[i]) {
+        while (!work[i].isDone()) {
+          work[i].wait();
         }
-      } catch (IOException e) {          // unable to connect to the region server.
-        errors.reportError("RegionServer: " + rsinfo.getServerName() +
-                      " Unable to fetch region information. " + e);
       }
     }
   }
@@ -585,7 +570,7 @@ public class HBaseFsck {
    * region name. If the region has not been seen yet, a new entry is added
    * and returned.
    */
-  private HbckInfo getOrCreateInfo(String name) {
+  private synchronized HbckInfo getOrCreateInfo(String name) {
     HbckInfo hbi = regionInfo.get(name);
     if (hbi == null) {
       hbi = new HbckInfo(null);
@@ -752,7 +737,11 @@ public class HBaseFsck {
       this.metaEntry = metaEntry;
     }
 
-    public String toString() {
+    public synchronized void addServer(HServerAddress server) {
+      this.deployedOn.add(server);
+    }
+
+    public synchronized String toString() {
       if (metaEntry != null) {
         return metaEntry.getRegionNameAsString();
       } else if (foundRegionDir != null) {
@@ -799,14 +788,14 @@ public class HBaseFsck {
     public int fixableCount = 0;
     private int showProgress;
 
-    public void reportWarning(String message) {
+    public synchronized void reportWarning(String message) {
       if (!summary) {
         System.out.println("WARNING: " + message);
       }
       warnCount++;
     }
 
-    public void reportError(String message) {
+    public synchronized void reportError(String message) {
       if (!summary) {
         System.out.println("ERROR: " + message);
       }
@@ -814,7 +803,7 @@ public class HBaseFsck {
       showProgress = 0;
     }
 
-    public void reportFixableError(String message) {
+    public synchronized void reportFixableError(String message) {
       if (!summary) {
         System.out.println("ERROR (fixable): " + message);
       }
@@ -822,7 +811,7 @@ public class HBaseFsck {
       showProgress = 0;
     }
 
-    public int summarize() {
+    public synchronized int summarize() {
       System.out.println(Integer.toString(errorCount + fixableCount) +
                          " inconsistencies detected.");
       System.out.println(Integer.toString(fixableCount) +
@@ -842,20 +831,20 @@ public class HBaseFsck {
       }
     }
 
-    public void print(String message) {
+    public synchronized void print(String message) {
       if (!summary) {
         System.out.println(message);
       }
     }
 
-    public void detail(String message) {
+    public synchronized void detail(String message) {
       if (details) {
         System.out.println(message);
       }
       showProgress = 0;
     }
 
-    public void progress() {
+    public synchronized void progress() {
       if (showProgress++ == 10) {
         if (!summary) {
           System.out.print(".");
@@ -866,6 +855,140 @@ public class HBaseFsck {
   }
 
   /**
+   * Contact a region server and get all information from it
+   */
+  static class WorkItemRegion implements Runnable {
+    private HBaseFsck hbck;
+    private HServerInfo rsinfo;
+    private ErrorReporter errors;
+    private HConnection connection;
+    private boolean done;
+
+    WorkItemRegion(HBaseFsck hbck, HServerInfo info,
+                   ErrorReporter errors, HConnection connection) {
+      this.hbck = hbck;
+      this.rsinfo = info;
+      this.errors = errors;
+      this.connection = connection;
+      this.done = false;
+    }
+
+    // is this task done?
+    synchronized boolean isDone() {
+      return done;
+    }
+
+    @Override
+    public synchronized void run() {
+      errors.progress();
+      try {
+        HRegionInterface server = connection.getHRegionConnection(
+                                    rsinfo.getServerAddress());
+
+        // list all online regions from this region server
+        HRegionInfo[] regions = server.getRegionsAssignment();
+
+        if (details) {
+          StringBuffer buf = new StringBuffer();
+          buf.append("\nRegionServer:" + rsinfo.getServerName() +
+                       " number of regions:" + regions.length);
+          for (HRegionInfo rinfo: regions) {
+            buf.append("\n\t name:" + rinfo.getRegionNameAsString() +
+                          " id:" + rinfo.getRegionId() +
+                          " encoded name:" + rinfo.getEncodedName() +
+                          " start :" + Bytes.toStringBinary(rinfo.getStartKey()) +
+                          " end :" + Bytes.toStringBinary(rinfo.getEndKey()));
+          }
+          errors.detail(buf.toString());
+        }
+
+        // check to see if the existance of this region matches the region in META
+        for (HRegionInfo r:regions) {
+          HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
+          hbi.addServer(rsinfo.getServerAddress());
+        }
+      } catch (IOException e) {          // unable to connect to the region server.
+        errors.reportError("RegionServer: " + rsinfo.getServerName() +
+                      " Unable to fetch region information. " + e);
+      } finally {
+        done = true;
+        notifyAll(); // wakeup anybody waiting for this item to be done
+      }
+    }
+  }
+
+  /**
+   * Contact hdfs and get all information about spcified table directory.
+   */
+  static class WorkItemHdfsDir implements Runnable {
+    private HBaseFsck hbck;
+    private FileStatus tableDir;
+    private ErrorReporter errors;
+    private FileSystem fs;
+    private boolean done;
+
+    WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
+                    FileStatus status) {
+      this.hbck = hbck;
+      this.fs = fs;
+      this.tableDir = status;
+      this.errors = errors;
+      this.done = false;
+    }
+
+    synchronized boolean isDone() {
+      return done;
+    }
+
+    @Override
+    public synchronized void run() {
+      try {
+        String tableName = tableDir.getPath().getName();
+        // ignore hidden files
+        if (tableName.startsWith(".") &&
+            !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
+          return;
+        // level 2: <HBASE_DIR>/<table>/*
+        FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
+        for (FileStatus regionDir : regionDirs) {
+          String encodedName = regionDir.getPath().getName();
+
+          // ignore directories that aren't hexadecimal
+          if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
+
+          HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
+          synchronized (hbi) {
+            if (hbi.foundRegionDir != null) {
+              errors.print("Directory " + encodedName + " duplicate??" +
+                           hbi.foundRegionDir);
+            }
+            hbi.foundRegionDir = regionDir;
+
+            // Set a flag if this region contains only edits
+            // This is special case if a region is left after split
+            hbi.onlyEdits = true;
+            FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
+            Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
+            for (FileStatus subDir : subDirs) {
+              String sdName = subDir.getPath().getName();
+              if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
+                hbi.onlyEdits = false;
+                break;
+              }
+            }
+          }
+        }
+      } catch (IOException e) {          // unable to connect to the region server.
+        errors.reportError("Table Directory: " + tableDir.getPath().getName() +
+                      " Unable to fetch region information. " + e);
+      } finally {
+        done = true;
+        notifyAll();
+      }
+    }
+  }
+
+  /**
    * Display the full report from fsck.
    * This displays all live and dead region servers, and all known regions.
    */
@@ -942,7 +1065,7 @@ public class HBaseFsck {
    * @param args
    */
   public static void main(String [] args)
-    throws IOException, MasterNotRunningException {
+    throws IOException, MasterNotRunningException, InterruptedException {
 
     // create a fsck object
     Configuration conf = HBaseConfiguration.create();