You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/03/11 07:00:16 UTC

svn commit: r1080457 - /hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java

Author: nspiegelberg
Date: Fri Mar 11 06:00:16 2011
New Revision: 1080457

URL: http://svn.apache.org/viewvc?rev=1080457&view=rev
Log:
HBASE-3620 : Make HBCK utility faster

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1080457&r1=1080456&r2=1080457&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Fri Mar 11 06:00:16 2011
@@ -29,6 +29,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,6 +73,9 @@ public class HBaseFsck {
   public static final long DEFAULT_TIME_LAG = 60000; // default value of 1 minute
   public static final long DEFAULT_SLEEP_BEFORE_RERUN = 10000;
 
+  private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
   private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
   private Configuration conf;
 
@@ -89,6 +95,9 @@ public class HBaseFsck {
   private static boolean summary = false; // if we want to print less output
   // Empty regioninfo qualifiers in .META.
   private Set<Result> emptyRegionInfoQualifiers = new HashSet<Result>();
+  private int numThreads = MAX_NUM_THREADS;
+
+  ThreadPoolExecutor executor;         // threads to retrieve data from regionservers
 
   /**
    * Constructor
@@ -104,6 +113,11 @@ public class HBaseFsck {
     HBaseAdmin admin = new HBaseAdmin(conf);
     status = admin.getMaster().getClusterStatus();
     connection = admin.getConnection();
+
+    numThreads = conf.getInt("hbasefsck.numthreads", numThreads);
+    executor = new ThreadPoolExecutor(0, numThreads,
+          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>());
   }
 
   /**
@@ -236,7 +250,7 @@ public class HBaseFsck {
    * Scan HDFS for all regions, recording their information into
    * regionInfo
    */
-  void checkHdfs() throws IOException {
+  void checkHdfs() throws IOException, InterruptedException {
     Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
     FileSystem fs = rootDir.getFileSystem(conf);
 
@@ -259,35 +273,19 @@ public class HBaseFsck {
     }
 
     // level 1:  <HBASE_DIR>/*
+    WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];  
+    int num = 0;
     for (FileStatus tableDir : tableDirs) {
-      String tableName = tableDir.getPath().getName();
-      // ignore hidden files
-      if (tableName.startsWith(".") &&
-          !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
-        continue;
-      // level 2: <HBASE_DIR>/<table>/*
-      FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
-      for (FileStatus regionDir : regionDirs) {
-        String encodedName = regionDir.getPath().getName();
-        // ignore directories that aren't hexadecimal
-        if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
-
-        HbckInfo hbi = getOrCreateInfo(encodedName);
-        hbi.foundRegionDir = regionDir;
-
-        // Set a flag if this region contains only edits
-        // This is special case if a region is left after split
-        hbi.onlyEdits = true;
-        FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
-        if (subDirs != null) {
-          Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
-          for (FileStatus subDir : subDirs) {
-            String sdName = subDir.getPath().getName();
-            if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
-              hbi.onlyEdits = false;
-              break;
-            }
-          }
+      dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir); 
+      executor.execute(dirs[num]);
+      num++;
+    }
+
+    // wait for all directories to be done
+    for (int i = 0; i < num; i++) {
+      synchronized (dirs[i]) {
+        while (!dirs[i].isDone()) {
+          dirs[i].wait();
         }
       }
     }
@@ -322,37 +320,24 @@ public class HBaseFsck {
    * @throws IOException if a remote or network exception occurs
    */
   void processRegionServers(Collection<HServerInfo> regionServerList)
-    throws IOException {
+    throws IOException, InterruptedException {
 
-    // loop to contact each region server
-    for (HServerInfo rsinfo: regionServerList) {
-      errors.progress();
-      try {
-        HRegionInterface server = connection.getHRegionConnection(
-                                    rsinfo.getServerAddress());
-
-        // list all online regions from this region server
-        List<HRegionInfo> regions = server.getOnlineRegions();
-        if (details) {
-          errors.detail("RegionServer: " + rsinfo.getServerName() +
-                           " number of regions: " + regions.size());
-          for (HRegionInfo rinfo: regions) {
-            errors.detail("  " + rinfo.getRegionNameAsString() +
-                             " id: " + rinfo.getRegionId() +
-                             " encoded_name: " + rinfo.getEncodedName() +
-                             " start: " + Bytes.toStringBinary(rinfo.getStartKey()) +
-                             " end: " + Bytes.toStringBinary(rinfo.getEndKey()));
-          }
-        }
+    WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
+    int num = 0;
 
-        // check to see if the existance of this region matches the region in META
-        for (HRegionInfo r:regions) {
-          HbckInfo hbi = getOrCreateInfo(r.getEncodedName());
-          hbi.deployedOn.add(rsinfo.getServerAddress());
+    // loop to contact each region server in parallel
+    for (HServerInfo rsinfo:regionServerList) {
+      work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
+      executor.execute(work[num]);
+      num++;
+    }
+    
+    // wait for all submitted tasks to be done
+    for (int i = 0; i < num; i++) {
+      synchronized (work[i]) {
+        while (!work[i].isDone()) {
+          work[i].wait();
         }
-      } catch (IOException e) {          // unable to connect to the region server.
-        errors.reportError("\nRegionServer:" + rsinfo.getServerName() +
-                           " Unable to fetch region information. " + e);
       }
     }
   }
@@ -611,7 +596,7 @@ public class HBaseFsck {
    * region name. If the region has not been seen yet, a new entry is added
    * and returned.
    */
-  private HbckInfo getOrCreateInfo(String name) {
+  private synchronized HbckInfo getOrCreateInfo(String name) {
     HbckInfo hbi = regionInfo.get(name);
     if (hbi == null) {
       hbi = new HbckInfo(null);
@@ -779,7 +764,11 @@ public class HBaseFsck {
       this.metaEntry = metaEntry;
     }
 
-    public String toString() {
+    public synchronized void addServer(HServerAddress server) {
+      this.deployedOn.add(server);
+    }
+
+    public synchronized String toString() {
       if (metaEntry != null) {
         return metaEntry.getRegionNameAsString();
       } else if (foundRegionDir != null) {
@@ -822,7 +811,7 @@ public class HBaseFsck {
     public int errorCount = 0;
     private int showProgress;
 
-    public void reportError(String message) {
+    public synchronized void reportError(String message) {
       if (!summary) {
         System.out.println("ERROR: " + message);
       }
@@ -830,7 +819,7 @@ public class HBaseFsck {
       showProgress = 0;
     }
 
-    public int summarize() {
+    public synchronized int summarize() {
       System.out.println(Integer.toString(errorCount) +
                          " inconsistencies detected.");
       if (errorCount == 0) {
@@ -842,20 +831,20 @@ public class HBaseFsck {
       }
     }
 
-    public void print(String message) {
+    public synchronized void print(String message) {
       if (!summary) {
         System.out.println(message);
       }
     }
 
-    public void detail(String message) {
+    public synchronized void detail(String message) {
       if (details) {
         System.out.println(message);
       }
       showProgress = 0;
     }
 
-    public void progress() {
+    public synchronized void progress() {
       if (showProgress++ == 10) {
         if (!summary) {
           System.out.print(".");
@@ -866,6 +855,137 @@ public class HBaseFsck {
   }
 
   /**
+   * Contact a region server and get all information from it
+   */
+  static class WorkItemRegion implements Runnable {
+    private HBaseFsck hbck;
+    private HServerInfo rsinfo;
+    private ErrorReporter errors;
+    private HConnection connection;
+    private boolean done;
+
+    WorkItemRegion(HBaseFsck hbck, HServerInfo info, 
+                   ErrorReporter errors, HConnection connection) {
+      this.hbck = hbck;
+      this.rsinfo = info;
+      this.errors = errors;
+      this.connection = connection;
+      this.done = false;
+    }
+
+    // is this task done?
+    synchronized boolean isDone() {
+      return done;
+    }
+
+    @Override
+    public synchronized void run() {
+      errors.progress();
+      try {
+        HRegionInterface server = connection.getHRegionConnection(
+                                    rsinfo.getServerAddress());
+
+        // list all online regions from this region server
+        List<HRegionInfo> regions = server.getOnlineRegions();
+        if (details) {
+          errors.detail("RegionServer: " + rsinfo.getServerName() +
+                           " number of regions: " + regions.size());
+          for (HRegionInfo rinfo: regions) {
+            errors.detail("  " + rinfo.getRegionNameAsString() +
+                             " id: " + rinfo.getRegionId() +
+                             " encoded_name: " + rinfo.getEncodedName() +
+                             " start: " + Bytes.toStringBinary(rinfo.getStartKey()) +
+                             " end: " + Bytes.toStringBinary(rinfo.getEndKey()));
+          }
+        }
+
+        // check to see if the existance of this region matches the region in META
+        for (HRegionInfo r:regions) {
+          HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
+          hbi.addServer(rsinfo.getServerAddress());
+        }
+      } catch (IOException e) {          // unable to connect to the region server. 
+        errors.reportError("RegionServer: " + rsinfo.getServerName() +
+                      " Unable to fetch region information. " + e);
+      } finally {
+        done = true;
+        notifyAll(); // wakeup anybody waiting for this item to be done
+      }
+    }
+  }
+
+  /**
+   * Contact hdfs and get all information about spcified table directory.
+   */
+  static class WorkItemHdfsDir implements Runnable {
+    private HBaseFsck hbck;
+    private FileStatus tableDir;
+    private ErrorReporter errors;
+    private FileSystem fs;
+    private boolean done;
+
+    WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, 
+                    FileStatus status) {
+      this.hbck = hbck;
+      this.fs = fs;
+      this.tableDir = status;
+      this.errors = errors;
+      this.done = false;
+    }
+
+    synchronized boolean isDone() {
+      return done;
+    } 
+
+    @Override
+    public synchronized void run() {
+      try {
+        String tableName = tableDir.getPath().getName();
+        // ignore hidden files
+        if (tableName.startsWith(".") &&
+            !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
+          return;
+        // level 2: <HBASE_DIR>/<table>/*
+        FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
+        for (FileStatus regionDir : regionDirs) {
+          String encodedName = regionDir.getPath().getName();
+
+          // ignore directories that aren't hexadecimal
+          if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
+  
+          HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
+          synchronized (hbi) {
+            if (hbi.foundRegionDir != null) {
+              errors.print("Directory " + encodedName + " duplicate??" +
+                           hbi.foundRegionDir);
+            }
+            hbi.foundRegionDir = regionDir;
+        
+            // Set a flag if this region contains only edits
+            // This is special case if a region is left after split
+            hbi.onlyEdits = true;
+            FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
+            Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
+            for (FileStatus subDir : subDirs) {
+              String sdName = subDir.getPath().getName();
+              if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
+                hbi.onlyEdits = false;
+                break;
+              }
+            }
+          }
+        }
+      } catch (IOException e) {          // unable to connect to the region server. 
+        errors.reportError("Table Directory: " + tableDir.getPath().getName() +
+                      " Unable to fetch region information. " + e);
+      } finally {
+        done = true;
+        notifyAll();
+      }
+    }
+  }
+
+  /**
    * Display the full report from fsck.
    * This displays all live and dead region servers, and all known regions.
    */