You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2012/06/04 11:40:43 UTC
svn commit: r1345888 - in /hbase/branches/0.92: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Author: jmhsieh
Date: Mon Jun 4 09:40:43 2012
New Revision: 1345888
URL: http://svn.apache.org/viewvc?rev=1345888&view=rev
Log:
HBASE-5892 [hbck] Refactor parallel WorkItem* to Futures (Andrew Wang)
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1345888&r1=1345887&r2=1345888&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Mon Jun 4 09:40:43 2012
@@ -100,6 +100,7 @@ Release 0.92.2 - Unreleased
HBASE-6114 CacheControl flags should be tunable per table schema per CF
HBASE-4720 Implement atomic update operations (checkAndPut, checkAndDelete) for REST client/server
(Mubarak Seyed and Jimmy Xiang)
+ HBASE-5892 [hbck] Refactor parallel WorkItem* to Futures (Andrew Wang)
NEW FEATURE
HBASE-5128 [uber hbck] Online automated repair of table integrity and region consistency problems
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1345888&r1=1345887&r2=1345888&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Mon Jun 4 09:40:43 2012
@@ -32,10 +32,11 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -152,7 +153,7 @@ public class HBaseFsck {
private HConnection connection;
private HBaseAdmin admin;
private HTable meta;
- private ThreadPoolExecutor executor; // threads to retrieve data from regionservers
+ private ScheduledThreadPoolExecutor executor; // threads to retrieve data from regionservers
private long startMillis = System.currentTimeMillis();
/***********
@@ -218,10 +219,7 @@ public class HBaseFsck {
this.conf = conf;
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
- executor = new ThreadPoolExecutor(numThreads, numThreads,
- THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- executor.allowCoreThreadTimeOut(true);
+ executor = new ScheduledThreadPoolExecutor(numThreads);
}
/**
@@ -627,20 +625,25 @@ public class HBaseFsck {
Collection<HbckInfo> hbckInfos = regionInfoMap.values();
// Parallelized read of .regioninfo files.
- WorkItemHdfsRegionInfo[] hbis = new WorkItemHdfsRegionInfo[hbckInfos.size()];
- int num = 0;
+ List<WorkItemHdfsRegionInfo> hbis = new ArrayList<WorkItemHdfsRegionInfo>(hbckInfos.size());
+ List<Future<Void>> hbiFutures;
+
for (HbckInfo hbi : hbckInfos) {
- hbis[num] = new WorkItemHdfsRegionInfo(hbi, this, errors);
- executor.execute(hbis[num]);
- num++;
+ WorkItemHdfsRegionInfo work = new WorkItemHdfsRegionInfo(hbi, this, errors);
+ hbis.add(work);
}
- for (int i=0; i < num; i++) {
- WorkItemHdfsRegionInfo hbi = hbis[i];
- synchronized(hbi) {
- while (!hbi.isDone()) {
- hbi.wait();
- }
+ // Submit and wait for completion
+ hbiFutures = executor.invokeAll(hbis);
+
+ for(int i=0; i<hbiFutures.size(); i++) {
+ WorkItemHdfsRegionInfo work = hbis.get(i);
+ Future<Void> f = hbiFutures.get(i);
+ try {
+ f.get();
+ } catch(ExecutionException e) {
+ LOG.warn("Failed to read .regioninfo file for region " +
+ work.hbi.getRegionNameAsString(), e.getCause());
}
}
@@ -1052,22 +1055,22 @@ public class HBaseFsck {
}
// level 1: <HBASE_DIR>/*
- WorkItemHdfsDir[] dirs = new WorkItemHdfsDir[tableDirs.size()];
- int num = 0;
+ List<WorkItemHdfsDir> dirs = new ArrayList<WorkItemHdfsDir>(tableDirs.size());
+ List<Future<Void>> dirsFutures;
+
for (FileStatus tableDir : tableDirs) {
LOG.debug("Loading region dirs from " +tableDir.getPath());
- dirs[num] = new WorkItemHdfsDir(this, fs, errors, tableDir);
- executor.execute(dirs[num]);
- num++;
+ dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir));
}
- // wait for all directories to be done
- for (int i = 0; i < num; i++) {
- WorkItemHdfsDir dir = dirs[i];
- synchronized (dir) {
- while (!dir.isDone()) {
- dir.wait();
- }
+ // Invoke and wait for Callables to complete
+ dirsFutures = executor.invokeAll(dirs);
+
+ for(Future<Void> f: dirsFutures) {
+ try {
+ f.get();
+ } catch(ExecutionException e) {
+ LOG.warn("Could not load region dir " , e.getCause());
}
}
}
@@ -1134,22 +1137,24 @@ public class HBaseFsck {
void processRegionServers(Collection<ServerName> regionServerList)
throws IOException, InterruptedException {
- WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
- int num = 0;
+ List<WorkItemRegion> workItems = new ArrayList<WorkItemRegion>(regionServerList.size());
+ List<Future<Void>> workFutures;
// loop to contact each region server in parallel
for (ServerName rsinfo: regionServerList) {
- work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
- executor.execute(work[num]);
- num++;
+ workItems.add(new WorkItemRegion(this, rsinfo, errors, connection));
}
- // wait for all submitted tasks to be done
- for (int i = 0; i < num; i++) {
- synchronized (work[i]) {
- while (!work[i].isDone()) {
- work[i].wait();
- }
+ workFutures = executor.invokeAll(workItems);
+
+ for(int i=0; i<workFutures.size(); i++) {
+ WorkItemRegion item = workItems.get(i);
+ Future<Void> f = workFutures.get(i);
+ try {
+ f.get();
+ } catch(ExecutionException e) {
+ LOG.warn("Could not process regionserver " + item.rsinfo.getHostAndPort(),
+ e.getCause());
}
}
}
@@ -2364,10 +2369,11 @@ public class HBaseFsck {
if (metaEntry != null) {
return metaEntry.getRegionNameAsString();
} else if (hdfsEntry != null) {
- return hdfsEntry.hri.getRegionNameAsString();
- } else {
- return null;
+ if (hdfsEntry.hri != null) {
+ return hdfsEntry.hri.getRegionNameAsString();
+ }
}
+ return null;
}
public byte[] getRegionName() {
@@ -2616,12 +2622,11 @@ public class HBaseFsck {
/**
* Contact a region server and get all information from it
*/
- static class WorkItemRegion implements Runnable {
+ static class WorkItemRegion implements Callable<Void> {
private HBaseFsck hbck;
private ServerName rsinfo;
private ErrorReporter errors;
private HConnection connection;
- private boolean done;
WorkItemRegion(HBaseFsck hbck, ServerName info,
ErrorReporter errors, HConnection connection) {
@@ -2629,16 +2634,10 @@ public class HBaseFsck {
this.rsinfo = info;
this.errors = errors;
this.connection = connection;
- this.done = false;
- }
-
- // is this task done?
- synchronized boolean isDone() {
- return done;
}
@Override
- public synchronized void run() {
+ public synchronized Void call() throws IOException {
errors.progress();
try {
HRegionInterface server =
@@ -2669,10 +2668,9 @@ public class HBaseFsck {
} catch (IOException e) { // unable to connect to the region server.
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "RegionServer: " + rsinfo.getServerName() +
" Unable to fetch region information. " + e);
- } finally {
- done = true;
- notifyAll(); // wakeup anybody waiting for this item to be done
+ throw e;
}
+ return null;
}
private List<HRegionInfo> filterOnlyMetaRegions(List<HRegionInfo> regions) {
@@ -2690,12 +2688,11 @@ public class HBaseFsck {
* Contact hdfs and get all information about specified table directory into
* regioninfo list.
*/
- static class WorkItemHdfsDir implements Runnable {
+ static class WorkItemHdfsDir implements Callable<Void> {
private HBaseFsck hbck;
private FileStatus tableDir;
private ErrorReporter errors;
private FileSystem fs;
- private boolean done;
WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
FileStatus status) {
@@ -2703,27 +2700,25 @@ public class HBaseFsck {
this.fs = fs;
this.tableDir = status;
this.errors = errors;
- this.done = false;
}
- synchronized boolean isDone() {
- return done;
- }
-
@Override
- public synchronized void run() {
+ public synchronized Void call() throws IOException {
try {
String tableName = tableDir.getPath().getName();
// ignore hidden files
if (tableName.startsWith(".") &&
- !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME)))
- return;
+ !tableName.equals( Bytes.toString(HConstants.META_TABLE_NAME))) {
+ return null;
+ }
// level 2: <HBASE_DIR>/<table>/*
FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
for (FileStatus regionDir : regionDirs) {
String encodedName = regionDir.getPath().getName();
// ignore directories that aren't hexadecimal
- if (!encodedName.toLowerCase().matches("[0-9a-f]+")) continue;
+ if (!encodedName.toLowerCase().matches("[0-9a-f]+")) {
+ continue;
+ }
LOG.debug("Loading region info from hdfs:"+ regionDir.getPath());
HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
@@ -2760,10 +2755,9 @@ public class HBaseFsck {
errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
+ tableDir.getPath().getName()
+ " Unable to fetch region information. " + e);
- } finally {
- done = true;
- notifyAll();
+ throw e;
}
+ return null;
}
}
@@ -2771,51 +2765,41 @@ public class HBaseFsck {
* Contact hdfs and get all information about specified table directory into
* regioninfo list.
*/
- static class WorkItemHdfsRegionInfo implements Runnable {
+ static class WorkItemHdfsRegionInfo implements Callable<Void> {
private HbckInfo hbi;
private HBaseFsck hbck;
private ErrorReporter errors;
- private boolean done;
WorkItemHdfsRegionInfo(HbckInfo hbi, HBaseFsck hbck, ErrorReporter errors) {
this.hbi = hbi;
this.hbck = hbck;
this.errors = errors;
- this.done = false;
- }
-
- synchronized boolean isDone() {
- return done;
}
@Override
- public synchronized void run() {
- try {
- // only load entries that haven't been loaded yet.
- if (hbi.getHdfsHRI() == null) {
+ public synchronized Void call() throws IOException {
+ // only load entries that haven't been loaded yet.
+ if (hbi.getHdfsHRI() == null) {
+ try {
+ hbck.loadHdfsRegioninfo(hbi);
+ } catch (IOException ioe) {
+ String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
+ + Bytes.toString(hbi.getTableName()) + " in hdfs dir "
+ + hbi.getHdfsRegionDir()
+ + "! It may be an invalid format or version file. Treating as "
+ + "an orphaned regiondir.";
+ errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
try {
- hbck.loadHdfsRegioninfo(hbi);
- } catch (IOException ioe) {
- String msg = "Orphan region in HDFS: Unable to load .regioninfo from table "
- + Bytes.toString(hbi.getTableName()) + " in hdfs dir "
- + hbi.getHdfsRegionDir()
- + "! It may be an invalid format or version file. Treating as "
- + "an orphaned regiondir.";
- errors.reportError(ERROR_CODE.ORPHAN_HDFS_REGION, msg);
- try {
- hbck.debugLsr(hbi.getHdfsRegionDir());
- } catch (IOException ioe2) {
- LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
- return; // TODO convert this in to a future
- }
- hbck.orphanHdfsDirs.add(hbi);
- return;
+ hbck.debugLsr(hbi.getHdfsRegionDir());
+ } catch (IOException ioe2) {
+ LOG.error("Unable to read directory " + hbi.getHdfsRegionDir(), ioe2);
+ throw ioe2;
}
+ hbck.orphanHdfsDirs.add(hbi);
+ throw ioe;
}
- } finally {
- done = true;
- notifyAll();
}
+ return null;
}
};