You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/07/11 02:07:53 UTC
svn commit: r1359957 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/wal/
main/java/org/apache/hadoop/hbase/zookeeper/ test/jav...
Author: tedyu
Date: Wed Jul 11 00:07:53 2012
New Revision: 1359957
URL: http://svn.apache.org/viewvc?rev=1359957&view=rev
Log:
HBASE-6337 [MTTR] Remove renaming tmp log file in SplitLogManager (Chunhui)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Jul 11 00:07:53 2012
@@ -139,10 +139,8 @@ public class SplitLogManager extends Zoo
this(zkw, conf, stopper, serverName, new TaskFinisher() {
@Override
public Status finish(ServerName workerName, String logfile) {
- String tmpname =
- ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile);
try {
- HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
+ HLogSplitter.finishSplitLogFile(logfile, conf);
} catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile);
return Status.ERR;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Jul 11 00:07:53 2012
@@ -108,9 +108,7 @@ public class SplitLogWorker extends ZooK
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- String tmpname =
- ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename);
- if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
+ if (HLogSplitter.splitLogFile(rootdir,
fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
return Status.PREEMPTED;
}
@@ -579,4 +577,4 @@ public class SplitLogWorker extends ZooK
}
public Status exec(String name, CancelableProgressable p);
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Jul 11 00:07:53 2012
@@ -346,15 +346,12 @@ public class HLogSplitter {
}
/**
- * Splits a HLog file into a temporary staging area. tmpname is used to build
- * the name of the staging area where the recovered-edits will be separated
- * out by region and stored.
+ * Splits a HLog file into region's recovered-edits directory
* <p>
* If the log file has N regions then N recovered.edits files will be
* produced.
* <p>
* @param rootDir
- * @param tmpname
* @param logfile
* @param fs
* @param conf
@@ -362,16 +359,15 @@ public class HLogSplitter {
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
- static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
- FileStatus logfile, FileSystem fs,
- Configuration conf, CancelableProgressable reporter) throws IOException {
+ static public boolean splitLogFile(Path rootDir, FileStatus logfile,
+ FileSystem fs, Configuration conf, CancelableProgressable reporter)
+ throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs);
- return s.splitLogFileToTemp(logfile, tmpname, reporter);
+ return s.splitLogFile(logfile, reporter);
}
- public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
- CancelableProgressable reporter)
- throws IOException {
+ public boolean splitLogFile(FileStatus logfile,
+ CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false;
Preconditions.checkState(status == null);
status = TaskMonitor.get().createStatus(
@@ -389,7 +385,7 @@ public class HLogSplitter {
in = getReader(fs, logfile, conf, skipErrors);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
- ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
}
if (in == null) {
@@ -397,8 +393,7 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
- this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(
- reporter, tmpname));
+ this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
if (!reportProgressIfIsDistributedLogSplitting()) {
return false;
}
@@ -431,7 +426,7 @@ public class HLogSplitter {
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e);
- ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@@ -451,94 +446,40 @@ public class HLogSplitter {
}
/**
- * Completes the work done by splitLogFileToTemp by moving the
- * recovered.edits from the staging area to the respective region server's
- * directories.
+ * Completes the work done by splitLogFile by archiving logs
* <p>
* It is invoked by SplitLogManager once it knows that one of the
- * SplitLogWorkers have completed the splitLogFileToTemp() part. If the
- * master crashes then this function might get called multiple times.
+ * SplitLogWorkers have completed the splitLogFile() part. If the master
+ * crashes then this function might get called multiple times.
* <p>
- * @param tmpname
+ * @param logfile
* @param conf
* @throws IOException
*/
- public static void moveRecoveredEditsFromTemp(String tmpname,
- String logfile, Configuration conf)
- throws IOException{
+ public static void finishSplitLogFile(String logfile, Configuration conf)
+ throws IOException {
Path rootdir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
- moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
+ finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
}
- public static void moveRecoveredEditsFromTemp(String tmpname,
- Path rootdir, Path oldLogDir,
- String logfile, Configuration conf)
- throws IOException{
+ public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+ String logfile, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
Path logPath = new Path(logfile);
- if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
+ if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
corruptedLogs.add(logPath);
} else {
processedLogs.add(logPath);
}
- Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
- List<FileStatus> files = listAll(fs, stagingDir);
- for (FileStatus f : files) {
- Path src = f.getPath();
- Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
- if (ZKSplitLog.isCorruptFlagFile(dst)) {
- continue;
- }
- if (fs.exists(src)) {
- if (fs.exists(dst)) {
- fs.delete(dst, false);
- } else {
- Path regionDir = dst.getParent().getParent();
- if (!fs.exists(regionDir)) {
- // See HBASE-6050.
- LOG.warn("Could not move recovered edits from " + src +
- " to destination " + regionDir + " as it doesn't exist.");
- continue;
- }
- Path dstdir = dst.getParent();
- if (!fs.exists(dstdir)) {
- if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
- }
- }
- fs.rename(src, dst);
- LOG.debug(" moved " + src + " => " + dst);
- } else {
- LOG.debug("Could not move recovered edits from " + src +
- " as it doesn't exist");
- }
- }
- archiveLogs(null, corruptedLogs, processedLogs,
- oldLogDir, fs, conf);
+ archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
fs.delete(stagingDir, true);
- return;
}
- private static List<FileStatus> listAll(FileSystem fs, Path dir)
- throws IOException {
- List<FileStatus> fset = new ArrayList<FileStatus>(100);
- FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null;
- if (files != null) {
- for (FileStatus f : files) {
- if (f.isDir()) {
- fset.addAll(listAll(fs, f.getPath()));
- } else {
- fset.add(f);
- }
- }
- }
- return fset;
- }
-
-
/**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
@@ -1027,14 +968,14 @@ public class HLogSplitter {
}
}
- private WriterAndPath createWAP(byte[] region, Entry entry,
- Path rootdir, String tmpname, FileSystem fs, Configuration conf)
+ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
+ FileSystem fs, Configuration conf)
throws IOException {
- Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, tmpname==null);
+ Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
if (regionedits == null) {
return null;
}
- if ((tmpname == null) && fs.exists(regionedits)) {
+ if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ regionedits + ", length="
@@ -1043,18 +984,10 @@ public class HLogSplitter {
LOG.warn("Failed delete of old " + regionedits);
}
}
- Path editsfile;
- if (tmpname != null) {
- // During distributed log splitting the output by each
- // SplitLogWorker is written to a temporary area.
- editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
- } else {
- editsfile = regionedits;
- }
- Writer w = createWriter(fs, editsfile, conf);
- LOG.debug("Creating writer path=" + editsfile + " region="
+ Writer w = createWriter(fs, regionedits, conf);
+ LOG.debug("Creating writer path=" + regionedits + " region="
+ Bytes.toStringBinary(region));
- return (new WriterAndPath(editsfile, w));
+ return (new WriterAndPath(regionedits, w));
}
Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
@@ -1110,12 +1043,9 @@ public class HLogSplitter {
// How often to send a progress report (default 1/2 master timeout)
private final int report_period;
private long last_report_at = 0;
- private final String tmpDirName;
- public DistributedLogSplittingHelper(CancelableProgressable reporter,
- String tmpName) {
+ public DistributedLogSplittingHelper(CancelableProgressable reporter) {
this.splitReporter = reporter;
- this.tmpDirName = tmpName;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT) / 2);
@@ -1139,10 +1069,6 @@ public class HLogSplitter {
return true;
}
}
-
- String getTmpDirName() {
- return this.tmpDirName;
- }
}
/**
@@ -1380,9 +1306,7 @@ public class HLogSplitter {
if (blacklistedRegions.contains(region)) {
return null;
}
- String tmpName = distributedLogSplittingHelper == null ? null
- : distributedLogSplittingHelper.getTmpDirName();
- ret = createWAP(region, entry, rootDir, tmpName, fs, conf);
+ ret = createWAP(region, entry, rootDir, fs, conf);
if (ret == null) {
blacklistedRegions.add(region);
return null;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Wed Jul 11 00:07:53 2012
@@ -98,27 +98,14 @@ public class ZKSplitLog {
return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
}
- public static Path stripSplitLogTempDir(Path rootdir, Path file) {
- int skipDepth = rootdir.depth() + 2;
- List<String> components = new ArrayList<String>(10);
- do {
- components.add(file.getName());
- file = file.getParent();
- } while (file.depth() > skipDepth);
- Path ret = rootdir;
- for (int i = components.size() - 1; i >= 0; i--) {
- ret = new Path(ret, components.get(i));
- }
- return ret;
- }
public static String getSplitLogDirTmpComponent(final String worker, String file) {
return worker + "_" + ZKSplitLog.encode(file);
}
- public static void markCorrupted(Path rootdir, String tmpname,
+ public static void markCorrupted(Path rootdir, String logFileName,
FileSystem fs) {
- Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
try {
fs.createNewFile(file);
} catch (IOException e) {
@@ -127,15 +114,12 @@ public class ZKSplitLog {
}
}
- public static boolean isCorrupted(Path rootdir, String tmpname,
+ public static boolean isCorrupted(Path rootdir, String logFileName,
FileSystem fs) throws IOException {
- Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
+ Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
boolean isCorrupt;
isCorrupt = fs.exists(file);
return isCorrupt;
}
- public static boolean isCorruptFlagFile(Path file) {
- return file.getName().equals("corrupt");
- }
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Wed Jul 11 00:07:53 2012
@@ -1016,10 +1016,9 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
- HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
- conf, reporter);
- HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
- logfile.getPath().toString(), conf);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+ .toString(), conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
@@ -1046,10 +1045,9 @@ public class TestHLogSplit {
LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true);
- HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
- conf, reporter);
- HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
- logfile.getPath().toString(), conf);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+ .toString(), conf);
assertTrue(!fs.exists(regiondir));
assertTrue(true);
@@ -1065,10 +1063,9 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
- HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
- conf, reporter);
- HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
- logfile.getPath().toString(), conf);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+ .toString(), conf);
Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
assertFalse(fs.exists(tdir));
@@ -1082,10 +1079,9 @@ public class TestHLogSplit {
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
- HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
- conf, reporter);
- HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
- logfile.getPath().toString(), conf);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+ .toString(), conf);
for (String region : regions) {
Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(10, countHLog(recovered, fs, conf));
@@ -1103,10 +1099,9 @@ public class TestHLogSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
- conf, reporter);
- HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
- logfile.getPath().toString(), conf);
+ HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
+ HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
+ .toString(), conf);
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1359957&r1=1359956&r2=1359957&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Wed Jul 11 00:07:53 2012
@@ -660,10 +660,10 @@ public class TestWALReplay {
wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
- HLogSplitter.splitLogFileToTemp(hbaseRootDir, hbaseRootDir + "/temp", listStatus[0], this.fs,
- this.conf, null);
- FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/temp/" + tableNameStr
- + "/" + hri.getEncodedName() + "/recovered.edits"));
+ HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
+ null);
+ FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
+ + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
int editCount = 0;
for (FileStatus fileStatus : listStatus1) {
editCount = Integer.parseInt(fileStatus.getPath().getName());