You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/01/23 19:10:56 UTC
svn commit: r1560776 -
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Author: jmhsieh
Date: Thu Jan 23 18:10:56 2014
New Revision: 1560776
URL: http://svn.apache.org/r1560776
Log:
HBASE-10401 [hbck] perform overlap group merges in parallel
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1560776&r1=1560775&r2=1560776&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Thu Jan 23 18:10:56 2014
@@ -171,7 +171,8 @@ public class HBaseFsck extends Configure
private HConnection connection;
private HBaseAdmin admin;
private HTable meta;
- protected ExecutorService executor; // threads to retrieve data from regionservers
+ // threads to do ||izable tasks: retrieve data from regionservers, handle overlapping regions
+ protected ExecutorService executor;
private long startMillis = System.currentTimeMillis();
private HFileCorruptionChecker hfcc;
private int retcode = 0;
@@ -1953,8 +1954,8 @@ public class HBaseFsck extends Configure
*/
public int mergeRegionDirs(Path targetRegionDir, HbckInfo contained) throws IOException {
int fileMoves = 0;
-
- LOG.debug("Contained region dir after close and pause");
+ String thread = Thread.currentThread().getName();
+ LOG.debug("[" + thread + "] Contained region dir after close and pause");
debugLsr(contained.getHdfsRegionDir());
// rename the contained into the container.
@@ -1966,8 +1967,8 @@ public class HBaseFsck extends Configure
// region we are attempting to merge in is not present! Since this is a merge, there is
// no harm skipping this region if it does not exist.
if (!fs.exists(contained.getHdfsRegionDir())) {
- LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " is missing. " +
- "Assuming already sidelined or moved.");
+ LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir()
+ + " is missing. Assuming already sidelined or moved.");
} else {
sidelineRegionDir(fs, contained);
}
@@ -1976,7 +1977,8 @@ public class HBaseFsck extends Configure
if (dirs == null) {
if (!fs.exists(contained.getHdfsRegionDir())) {
- LOG.warn("HDFS region dir " + contained.getHdfsRegionDir() + " already sidelined.");
+ LOG.warn("[" + thread + "] HDFS region dir " + contained.getHdfsRegionDir()
+ + " already sidelined.");
} else {
sidelineRegionDir(fs, contained);
}
@@ -1997,7 +1999,7 @@ public class HBaseFsck extends Configure
continue;
}
- LOG.info("Moving files from " + src + " into containing region " + dst);
+ LOG.info("[" + thread + "] Moving files from " + src + " into containing region " + dst);
// FileSystem.rename is inconsistent with directories -- if the
// dst (foo/a) exists and is a dir, and the src (foo/b) is a dir,
// it moves the src into the dst dir resulting in (foo/a/b). If
@@ -2008,19 +2010,37 @@ public class HBaseFsck extends Configure
fileMoves++;
}
}
- LOG.debug("Sideline directory contents:");
+ LOG.debug("[" + thread + "] Sideline directory contents:");
debugLsr(targetRegionDir);
}
// if all success.
sidelineRegionDir(fs, contained);
- LOG.info("Sidelined region dir "+ contained.getHdfsRegionDir() + " into " +
+ LOG.info("[" + thread + "] Sidelined region dir "+ contained.getHdfsRegionDir() + " into " +
getSidelineDir());
debugLsr(contained.getHdfsRegionDir());
return fileMoves;
}
+
+ static class WorkItemOverlapMerge implements Callable<Void> {
+ private TableIntegrityErrorHandler handler;
+ Collection<HbckInfo> overlapgroup;
+
+ WorkItemOverlapMerge(Collection<HbckInfo> overlapgroup, TableIntegrityErrorHandler handler) {
+ this.handler = handler;
+ this.overlapgroup = overlapgroup;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ handler.handleOverlapGroup(overlapgroup);
+ return null;
+ }
+ };
+
+
/**
* Maintain information about a particular table.
*/
@@ -2246,6 +2266,8 @@ public class HBaseFsck extends Configure
* Cases:
* - Clean regions that overlap
* - Only .oldlogs regions (can't find start/stop range, or figure out)
+ *
+ * This is basically threadsafe, except for the fixer increment in mergeOverlaps.
*/
@Override
public void handleOverlapGroup(Collection<HbckInfo> overlap)
@@ -2273,7 +2295,8 @@ public class HBaseFsck extends Configure
void mergeOverlaps(Collection<HbckInfo> overlap)
throws IOException {
- LOG.info("== Merging regions into one region: "
+ String thread = Thread.currentThread().getName();
+ LOG.info("== [" + thread + "] Merging regions into one region: "
+ Joiner.on(",").join(overlap));
// get the min / max range and close all concerned regions
Pair<byte[], byte[]> range = null;
@@ -2291,25 +2314,25 @@ public class HBaseFsck extends Configure
}
}
// need to close files so delete can happen.
- LOG.debug("Closing region before moving data around: " + hi);
- LOG.debug("Contained region dir before close");
+ LOG.debug("[" + thread + "] Closing region before moving data around: " + hi);
+ LOG.debug("[" + thread + "] Contained region dir before close");
debugLsr(hi.getHdfsRegionDir());
try {
- LOG.info("Closing region: " + hi);
+ LOG.info("[" + thread + "] Closing region: " + hi);
closeRegion(hi);
} catch (IOException ioe) {
- LOG.warn("Was unable to close region " + hi
+ LOG.warn("[" + thread + "] Was unable to close region " + hi
+ ". Just continuing... ", ioe);
} catch (InterruptedException e) {
- LOG.warn("Was unable to close region " + hi
+ LOG.warn("[" + thread + "] Was unable to close region " + hi
+ ". Just continuing... ", e);
}
try {
- LOG.info("Offlining region: " + hi);
+ LOG.info("[" + thread + "] Offlining region: " + hi);
offline(hi.getRegionName());
} catch (IOException ioe) {
- LOG.warn("Unable to offline region from master: " + hi
+ LOG.warn("[" + thread + "] Unable to offline region from master: " + hi
+ ". Just continuing... ", ioe);
}
}
@@ -2320,7 +2343,7 @@ public class HBaseFsck extends Configure
HRegionInfo newRegion = new HRegionInfo(htd.getName(), range.getFirst(),
range.getSecond());
HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, newRegion, htd);
- LOG.info("Created new empty container region: " +
+ LOG.info("[" + thread + "] Created new empty container region: " +
newRegion + " to contain regions: " + Joiner.on(",").join(overlap));
debugLsr(region.getRegionDir());
@@ -2328,7 +2351,7 @@ public class HBaseFsck extends Configure
boolean didFix= false;
Path target = region.getRegionDir();
for (HbckInfo contained : overlap) {
- LOG.info("Merging " + contained + " into " + target );
+ LOG.info("[" + thread + "] Merging " + contained + " into " + target );
int merges = mergeRegionDirs(target, contained);
if (merges > 0) {
didFix = true;
@@ -2477,9 +2500,21 @@ public class HBaseFsck extends Configure
if (prevKey != null) {
handler.handleRegionEndKeyNotEmpty(prevKey);
}
-
- for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
- handler.handleOverlapGroup(overlap);
+
+ // TODO fold this into the TableIntegrityHandler
+ if (getConf().getBoolean("hbasefsck.overlap.merge.parallel", true)) {
+ LOG.info("Handling overlap merges in parallel. set hbasefsck.overlap.merge.parallel to" +
+ " false to run serially.");
+ boolean ok = handleOverlapsParallel(handler, prevKey);
+ if (!ok) {
+ return false;
+ }
+ } else {
+ LOG.info("Handling overlap merges serially. set hbasefsck.overlap.merge.parallel to" +
+ " true to run in parallel.");
+ for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
+ handler.handleOverlapGroup(overlap);
+ }
}
if (details) {
@@ -2503,6 +2538,38 @@ public class HBaseFsck extends Configure
return errors.getErrorList().size() == originalErrorsCount;
}
+ private boolean handleOverlapsParallel(TableIntegrityErrorHandler handler, byte[] prevKey)
+ throws IOException {
+ // we parallelize overlap handler for the case we have lots of groups to fix. We can
+ // safely assume each group is independent.
+ List<WorkItemOverlapMerge> merges = new ArrayList<WorkItemOverlapMerge>(overlapGroups.size());
+ List<Future<Void>> rets;
+ for (Collection<HbckInfo> overlap : overlapGroups.asMap().values()) {
+ //
+ merges.add(new WorkItemOverlapMerge(overlap, handler));
+ }
+ try {
+ rets = executor.invokeAll(merges);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ LOG.error("Overlap merges were interrupted", e);
+ return false;
+ }
+ for(int i=0; i<merges.size(); i++) {
+ WorkItemOverlapMerge work = merges.get(i);
+ Future<Void> f = rets.get(i);
+ try {
+ f.get();
+ } catch(ExecutionException e) {
+ LOG.warn("Failed to merge overlap group" + work, e.getCause());
+ } catch (InterruptedException e) {
+ LOG.error("Waiting for overlap merges was interrupted", e);
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* This dumps data in a visually reasonable way for visual debugging
*