You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:11:53 UTC
svn commit: r1181476 -
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
Author: nspiegelberg
Date: Tue Oct 11 02:11:52 2011
New Revision: 1181476
URL: http://svn.apache.org/viewvc?rev=1181476&view=rev
Log:
Make HBCK utility faster
Summary:
Make the HBCK utility contact all region servers in parallel. This will speedup
hbck processing, especially when there are lots of region servers.
Test Plan:
run on server
DiffCamp Revision: 192043
Reviewed By: kannan
Reviewers: nspiegelberg, kannan
CC: dhruba, kannan, hbase@lists
Revert Plan:
OK
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java?rev=1181476&r1=1181475&r2=1181476&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseFsck.java Tue Oct 11 02:11:52 2011
@@ -29,6 +29,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +64,9 @@ import com.google.common.collect.Lists;
* region server(s) and the state of data in HDFS.
*/
public class HBaseFsck {
+ private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
+ private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
private Configuration conf;
@@ -79,6 +85,9 @@ public class HBaseFsck {
private boolean rerun = false; // if we tried to fix something rerun hbck
private static boolean summary = false; // if we want to print less output
private static boolean promptResponse = false; // "no" to all prompt questions
+ private int numThreads = MAX_NUM_THREADS;
+
+ ThreadPoolExecutor executor; // threads to retrieve data from regionservers
/**
* Constructor
@@ -94,6 +103,11 @@ public class HBaseFsck {
HBaseAdmin admin = new HBaseAdmin(conf);
status = admin.getMaster().getClusterStatus();
connection = admin.getConnection();
+
+ numThreads = conf.getInt("hbasefsck.numthreads", numThreads);
+ executor = new ThreadPoolExecutor(0, numThreads,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
}
/**
@@ -101,7 +115,7 @@ public class HBaseFsck {
* @throws IOException if a remote or network exception occurs
* @return 0 on success, non-zero on failure
*/
- int doWork() throws IOException {
+ int doWork() throws IOException, InterruptedException {
// print hbase server version
errors.print("Version: " + status.getHBaseVersion());
@@ -187,7 +201,7 @@ public class HBaseFsck {
* Scan HDFS for all regions, recording their information into
* regionInfo
*/
- void checkHdfs() throws IOException {
+ void checkHdfs() throws IOException, InterruptedException {
Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(conf);
@@ -208,36 +222,21 @@ public class HBaseFsck {
if (!foundVersionFile) {
errors.reportError("Version file does not exist in root dir " + rootDir);
}
-
+ //
// level 1: <HBASE_DIR>/*
+ WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];
+ int num = 0;
for (FileStatus tableDir : tableDirs) {
- String tableName = tableDir.getPath().getName();
- // ignore hidden files
- if (tableName.startsWith(".") &&
- !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
- continue;
- // level 2: <HBASE_DIR>/<table>/*
- FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
- for (FileStatus regionDir : regionDirs) {
- String encodedName = regionDir.getPath().getName();
-
- // ignore directories that aren't hexadecimal
- if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
-
- HbckInfo hbi = getOrCreateInfo(encodedName);
- hbi.foundRegionDir = regionDir;
-
- // Set a flag if this region contains only edits
- // This is special case if a region is left after split
- hbi.onlyEdits = true;
- FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
- Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
- for (FileStatus subDir : subDirs) {
- String sdName = subDir.getPath().getName();
- if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
- hbi.onlyEdits = false;
- break;
- }
+ dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir);
+ executor.execute(dirs[num]);
+ num++;
+ }
+
+ // wait for all directories to be done
+ for (int i = 0; i < num; i++) {
+ synchronized (dirs[i]) {
+ while (!dirs[i].isDone()) {
+ dirs[i].wait();
}
}
}
@@ -273,38 +272,24 @@ public class HBaseFsck {
* @throws IOException if a remote or network exception occurs
*/
void processRegionServers(Collection<HServerInfo> regionServerList)
- throws IOException {
+ throws IOException, InterruptedException {
- // loop to contact each region server
- for (HServerInfo rsinfo:regionServerList) {
- errors.progress();
- try {
- HRegionInterface server = connection.getHRegionConnection(
- rsinfo.getServerAddress());
+ WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
+ int num = 0;
- // list all online regions from this region server
- HRegionInfo[] regions = server.getRegionsAssignment();
-
- if (details) {
- errors.detail("\nRegionServer:" + rsinfo.getServerName() +
- " number of regions:" + regions.length);
- for (HRegionInfo rinfo: regions) {
- errors.detail("\n\t name:" + rinfo.getRegionNameAsString() +
- " id:" + rinfo.getRegionId() +
- " encoded name:" + rinfo.getEncodedName() +
- " start :" + Bytes.toStringBinary(rinfo.getStartKey()) +
- " end :" + Bytes.toStringBinary(rinfo.getEndKey()));
- }
- }
+ // loop to contact each region server in parallel
+ for (HServerInfo rsinfo:regionServerList) {
+ work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
+ executor.execute(work[num]);
+ num++;
+ }
- // check to see if the existance of this region matches the region in META
- for (HRegionInfo r:regions) {
- HbckInfo hbi = getOrCreateInfo(r.getEncodedName());
- hbi.deployedOn.add(rsinfo.getServerAddress());
+ // wait for all submitted tasks to be done
+ for (int i = 0; i < num; i++) {
+ synchronized (work[i]) {
+ while (!work[i].isDone()) {
+ work[i].wait();
}
- } catch (IOException e) { // unable to connect to the region server.
- errors.reportError("RegionServer: " + rsinfo.getServerName() +
- " Unable to fetch region information. " + e);
}
}
}
@@ -585,7 +570,7 @@ public class HBaseFsck {
* region name. If the region has not been seen yet, a new entry is added
* and returned.
*/
- private HbckInfo getOrCreateInfo(String name) {
+ private synchronized HbckInfo getOrCreateInfo(String name) {
HbckInfo hbi = regionInfo.get(name);
if (hbi == null) {
hbi = new HbckInfo(null);
@@ -752,7 +737,11 @@ public class HBaseFsck {
this.metaEntry = metaEntry;
}
- public String toString() {
+ public synchronized void addServer(HServerAddress server) {
+ this.deployedOn.add(server);
+ }
+
+ public synchronized String toString() {
if (metaEntry != null) {
return metaEntry.getRegionNameAsString();
} else if (foundRegionDir != null) {
@@ -799,14 +788,14 @@ public class HBaseFsck {
public int fixableCount = 0;
private int showProgress;
- public void reportWarning(String message) {
+ public synchronized void reportWarning(String message) {
if (!summary) {
System.out.println("WARNING: " + message);
}
warnCount++;
}
- public void reportError(String message) {
+ public synchronized void reportError(String message) {
if (!summary) {
System.out.println("ERROR: " + message);
}
@@ -814,7 +803,7 @@ public class HBaseFsck {
showProgress = 0;
}
- public void reportFixableError(String message) {
+ public synchronized void reportFixableError(String message) {
if (!summary) {
System.out.println("ERROR (fixable): " + message);
}
@@ -822,7 +811,7 @@ public class HBaseFsck {
showProgress = 0;
}
- public int summarize() {
+ public synchronized int summarize() {
System.out.println(Integer.toString(errorCount + fixableCount) +
" inconsistencies detected.");
System.out.println(Integer.toString(fixableCount) +
@@ -842,20 +831,20 @@ public class HBaseFsck {
}
}
- public void print(String message) {
+ public synchronized void print(String message) {
if (!summary) {
System.out.println(message);
}
}
- public void detail(String message) {
+ public synchronized void detail(String message) {
if (details) {
System.out.println(message);
}
showProgress = 0;
}
- public void progress() {
+ public synchronized void progress() {
if (showProgress++ == 10) {
if (!summary) {
System.out.print(".");
@@ -866,6 +855,140 @@ public class HBaseFsck {
}
/**
+ * Contact a region server and get all information from it
+ */
+ static class WorkItemRegion implements Runnable {
+ private HBaseFsck hbck;
+ private HServerInfo rsinfo;
+ private ErrorReporter errors;
+ private HConnection connection;
+ private boolean done;
+
+ WorkItemRegion(HBaseFsck hbck, HServerInfo info,
+ ErrorReporter errors, HConnection connection) {
+ this.hbck = hbck;
+ this.rsinfo = info;
+ this.errors = errors;
+ this.connection = connection;
+ this.done = false;
+ }
+
+ // is this task done?
+ synchronized boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public synchronized void run() {
+ errors.progress();
+ try {
+ HRegionInterface server = connection.getHRegionConnection(
+ rsinfo.getServerAddress());
+
+ // list all online regions from this region server
+ HRegionInfo[] regions = server.getRegionsAssignment();
+
+ if (details) {
+ StringBuffer buf = new StringBuffer();
+ buf.append("\nRegionServer:" + rsinfo.getServerName() +
+ " number of regions:" + regions.length);
+ for (HRegionInfo rinfo: regions) {
+ buf.append("\n\t name:" + rinfo.getRegionNameAsString() +
+ " id:" + rinfo.getRegionId() +
+ " encoded name:" + rinfo.getEncodedName() +
+ " start :" + Bytes.toStringBinary(rinfo.getStartKey()) +
+ " end :" + Bytes.toStringBinary(rinfo.getEndKey()));
+ }
+ errors.detail(buf.toString());
+ }
+
+ // check to see if the existance of this region matches the region in META
+ for (HRegionInfo r:regions) {
+ HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
+ hbi.addServer(rsinfo.getServerAddress());
+ }
+ } catch (IOException e) { // unable to connect to the region server.
+ errors.reportError("RegionServer: " + rsinfo.getServerName() +
+ " Unable to fetch region information. " + e);
+ } finally {
+ done = true;
+ notifyAll(); // wakeup anybody waiting for this item to be done
+ }
+ }
+ }
+
+ /**
+ * Contact hdfs and get all information about spcified table directory.
+ */
+ static class WorkItemHdfsDir implements Runnable {
+ private HBaseFsck hbck;
+ private FileStatus tableDir;
+ private ErrorReporter errors;
+ private FileSystem fs;
+ private boolean done;
+
+ WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
+ FileStatus status) {
+ this.hbck = hbck;
+ this.fs = fs;
+ this.tableDir = status;
+ this.errors = errors;
+ this.done = false;
+ }
+
+ synchronized boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public synchronized void run() {
+ try {
+ String tableName = tableDir.getPath().getName();
+ // ignore hidden files
+ if (tableName.startsWith(".") &&
+ !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
+ return;
+ // level 2: <HBASE_DIR>/<table>/*
+ FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
+ for (FileStatus regionDir : regionDirs) {
+ String encodedName = regionDir.getPath().getName();
+
+ // ignore directories that aren't hexadecimal
+ if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
+
+ HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
+ synchronized (hbi) {
+ if (hbi.foundRegionDir != null) {
+ errors.print("Directory " + encodedName + " duplicate??" +
+ hbi.foundRegionDir);
+ }
+ hbi.foundRegionDir = regionDir;
+
+ // Set a flag if this region contains only edits
+ // This is special case if a region is left after split
+ hbi.onlyEdits = true;
+ FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
+ Path ePath = HLog.getRegionDirRecoveredEditsDir(regionDir.getPath());
+ for (FileStatus subDir : subDirs) {
+ String sdName = subDir.getPath().getName();
+ if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
+ hbi.onlyEdits = false;
+ break;
+ }
+ }
+ }
+ }
+ } catch (IOException e) { // unable to connect to the region server.
+ errors.reportError("Table Directory: " + tableDir.getPath().getName() +
+ " Unable to fetch region information. " + e);
+ } finally {
+ done = true;
+ notifyAll();
+ }
+ }
+ }
+
+ /**
* Display the full report from fsck.
* This displays all live and dead region servers, and all known regions.
*/
@@ -942,7 +1065,7 @@ public class HBaseFsck {
* @param args
*/
public static void main(String [] args)
- throws IOException, MasterNotRunningException {
+ throws IOException, MasterNotRunningException, InterruptedException {
// create a fsck object
Configuration conf = HBaseConfiguration.create();