You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/12/17 20:19:44 UTC
svn commit: r1423101 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/wal/
test/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/ap...
Author: liyin
Date: Mon Dec 17 19:19:43 2012
New Revision: 1423101
URL: http://svn.apache.org/viewvc?rev=1423101&view=rev
Log:
[HBASE-7366] Make old logs directory have a subdirectory structure
Author: amirshim
Summary:
To speed up replication, it is useful to have each regionserver's old logs in a separate directory, so we don't have to do LS on a 300,000 file directory (namenode doesn't like that). So we're changing the oldlogs directory structure to contain subdirectories of the format: "10.159.11.45%3A60020", for example:
/TITAN029-ASH3-HBASE/.oldlogs/10.159.11.45%3A60020/10.159.11.45%3A60020.1355452542203
Test Plan:
Unit tests - passed.
Run on cluster, and make sure that files are being moved to the correct subdirectories, and being deleted after the retention time has elapsed.
Reviewers: liyintang, nspiegelberg, mycnyc
Reviewed By: liyintang
CC: hbase-eng@, davejwatson
Differential Revision: https://phabricator.fb.com/D659154
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Mon Dec 17 19:19:43 2012
@@ -220,6 +220,17 @@ public final class HConstants {
/** Like the previous, but for old logs that are about to be deleted */
public static final String HREGION_OLDLOGDIR_NAME = ".oldlogs";
+ /** Boolean config to determine if we should use a subdir structure
+ * in the .oldlogs directory */
+ public static final String HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE =
+ "hbase.regionserver.oldlogs.use.subdir.structure";
+
+ /** Boolean config to determine if we should use a subdir structure in
+ * the .oldlogs directory by default */
+ public static final boolean HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT =
+ true;
+
+
/** Used to construct the name of the compaction directory during compaction */
public static final String HREGION_COMPACTIONDIR_NAME = "compaction.dir";
@@ -475,7 +486,7 @@ public final class HConstants {
* scanner's next method.
*/
public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size";
-
+
/**
* Maximum number of bytes returned when calling a scanner's next method.
* Note that when a single row is larger than this limit the row is still
@@ -488,7 +499,7 @@ public final class HConstants {
/**
* Maximum number of bytes returned when calling a scanner's next method.
- * Used with partialRow parameter on the client side. Note that when a
+ * Used with partialRow parameter on the client side. Note that when a
* single row is larger than this limit, the row is still returned completely
* if partialRow is true, otherwise, the row will be truncated in order to
* fit the memory.
@@ -596,11 +607,11 @@ public final class HConstants {
/** The number of HLogs for each region server */
public static final String HLOG_CNT_PER_SERVER = "hbase.regionserver.hlog.cnt.perserver";
-
+
public static final String HLOG_FORMAT_BACKWARD_COMPATIBILITY =
"hbase.regionserver.hlog.format.backward.compatibility";
-
- /**
+
+ /**
* The byte array represents for NO_NEXT_INDEXED_KEY;
* The actual value is irrelevant because this is always compared by reference.
*/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java Mon Dec 17 19:19:43 2012
@@ -91,32 +91,48 @@ public class OldLogsCleaner extends Chor
}
}
- @Override
- protected void chore() {
- try {
- FileStatus[] files = this.fs.listStatus(this.oldLogDir);
- if (files == null) {
- // We don't have any files to process.
- return;
+ /**
+ * Delete log files directories recursively.
+ * @param files The list of files/directories to traverse.
+ * @param deleteCountLeft Max number of files to delete
+ * @param maxDepth Max Directory depth to recurse
+ * @return Number of files left to delete (deleteCountLeft - number deleted)
+ * @throws IOException
+ */
+ private int cleanFiles(FileStatus[] files, int deleteCountLeft,
+ int maxDepth) throws IOException {
+ if (files == null || files.length == 0) return deleteCountLeft;
+ if (maxDepth <= 0) {
+ LOG.warn("Old Logs directory structure is too deep: " + files[0].getPath());
+ return deleteCountLeft;
+ }
+ for (FileStatus file : files) {
+ if (deleteCountLeft <= 0) return 0; // we don't have anymore to delete
+ if (file.isDir()) {
+ deleteCountLeft = cleanFiles(this.fs.listStatus(file.getPath()),
+ deleteCountLeft, maxDepth - 1);
+ continue;
}
- int nbDeletedLog = 0;
- for (FileStatus file : files) {
- Path filePath = file.getPath();
- if (HLog.validateHLogFilename(filePath.getName())) {
- if (logCleaner.isLogDeletable(filePath) ) {
- this.fs.delete(filePath, true);
- nbDeletedLog++;
- }
- } else {
- LOG.warn("Found a wrongly formated file: "
- + file.getPath().getName());
+ Path filePath = file.getPath();
+ if (HLog.validateHLogFilename(filePath.getName())) {
+ if (logCleaner.isLogDeletable(filePath) ) {
this.fs.delete(filePath, true);
- nbDeletedLog++;
- }
- if (nbDeletedLog >= maxDeletedLogs) {
- break;
+ deleteCountLeft--;
}
+ } else {
+ LOG.warn("Found a wrongly formatted file: "
+ + file.getPath().getName());
+ this.fs.delete(filePath, true);
+ deleteCountLeft--;
}
+ }
+ return deleteCountLeft;
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ cleanFiles(this.fs.listStatus(this.oldLogDir), maxDeletedLogs, 2);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Error while cleaning the logs", e);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Dec 17 19:19:43 2012
@@ -242,7 +242,7 @@ public class HLog implements Syncable {
// Lock to guarantee the ordering of log entries in HLOG
private final Object appendLock = new Object();
-
+
private final boolean enabled;
/*
@@ -330,25 +330,25 @@ public class HLog implements Syncable {
/**
* Double list buffer for WAL that allows entries to be
* appended while sync is in progress
- *
+ *
* CurrentList is for buffering appended entries;
* syncList contains entries being synced to persistent storage;
*/
private class DoubleListBuffer {
private LinkedList<Entry> currentList = new LinkedList<Entry>();
private LinkedList<Entry> syncList = new LinkedList<Entry>();
-
+
/**
- * Append a log entry into the buffer
+ * Append a log entry into the buffer
* @param entry log entry
*/
synchronized private void appendToBuffer(Entry entry) {
currentList.add(entry);
}
-
+
/**
* Sync buffered log entries into persistent storage
- *
+ *
* @return number of log entries synced
*/
private int appendAndSync() throws IOException {
@@ -363,14 +363,14 @@ public class HLog implements Syncable {
syncList = currentList;
currentList = tmp;
}
-
+
// append entries to writer
int syncedEntries = syncList.size();
while (!syncList.isEmpty()) {
Entry entry = syncList.remove();
writer.append(entry);
}
-
+
// sync the data
long now = System.currentTimeMillis();
writer.sync();
@@ -378,7 +378,7 @@ public class HLog implements Syncable {
return syncedEntries;
}
}
-
+
private DoubleListBuffer logBuffer = new DoubleListBuffer();
/**
@@ -437,17 +437,17 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
-
+
if (!fs.exists(oldLogDir)) {
fs.mkdirs(oldLogDir);
}
this.oldLogDir = oldLogDir;
-
+
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
this.dir = dir;
-
+
this.hlogIndexID = hlogIndexID;
this.hlogName = "HLog-" + this.hlogIndexID + " ";
@@ -461,14 +461,14 @@ public class HLog implements Syncable {
if (actionListener != null) {
addLogActionsListerner(actionListener);
}
-
+
// If prefix is null||empty, then just name it hlog.
if (conf.getBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, true)) {
this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8");
LOG.warn("Still using old hlog prefix due to HLOG_FORMAT_BACK_COMPATIBILITY: " + this.prefix);
} else {
// Also append the current hlogIndexId-totalHLogCnt to the prefix.
- this.prefix = (prefix == null || prefix.isEmpty() ?
+ this.prefix = (prefix == null || prefix.isEmpty() ?
"hlog" : URLEncoder.encode(prefix, "UTF8"))
+ "." + hlogIndexID + "-" + totalHLogCnt;
LOG.info("HLog prefix is " + this.prefix);
@@ -495,7 +495,7 @@ public class HLog implements Syncable {
} else {
LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
}
-
+
logSyncerThread = new LogSyncer(this.optionalFlushInterval);
Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
Thread.currentThread().getName() + ".logSyncer-" + hlogIndexID);
@@ -909,7 +909,7 @@ public class HLog implements Syncable {
}
private void archiveLogFile(final Path p, final Long seqno) throws IOException {
- Path newPath = getHLogArchivePath(this.oldLogDir, p);
+ Path newPath = getHLogArchivePath(this.oldLogDir, p, this.fs, this.conf);
LOG.info(hlogName + "moving old hlog file " + FSUtils.getPath(p) +
" whose highest sequence/edit id is " + seqno + " to " +
FSUtils.getPath(newPath));
@@ -951,11 +951,10 @@ public class HLog implements Syncable {
close();
FileStatus[] files = fs.listStatus(this.dir);
for(FileStatus file : files) {
- fs.rename(file.getPath(),
- getHLogArchivePath(this.oldLogDir, file.getPath()));
+ Path newPath = getHLogArchivePath(this.oldLogDir, file.getPath(), fs, this.conf);
+ fs.rename(file.getPath(), newPath);
+ LOG.debug(hlogName + "Moved log file " + file + " to " + newPath);
}
- LOG.debug(hlogName + "Moved " + files.length + " log files to " +
- FSUtils.getPath(this.oldLogDir));
fs.delete(dir, true);
}
@@ -978,11 +977,11 @@ public class HLog implements Syncable {
} catch (InterruptedException e) {
LOG.error(hlogName + "Exception while waiting for syncer thread to die", e);
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("closing hlog writer in " + this.dir.toString());
}
-
+
cacheFlushLock.writeLock().lock();
try {
synchronized (updateLock) {
@@ -1040,7 +1039,7 @@ public class HLog implements Syncable {
long len = edits.getTotalKeyValueLength();
long txid = 0;
-
+
long start = System.currentTimeMillis();
byte[] regionName = info.getRegionName();
@@ -1053,20 +1052,20 @@ public class HLog implements Syncable {
long seqNum = obtainSeqNum();
this.firstSeqWrittenInCurrentMemstore.putIfAbsent(regionName, seqNum);
HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
-
+
doWrite(info, logKey, edits);
// Only count 1 row as an unflushed entry.
txid = this.unflushedEntries.incrementAndGet();
}
-
- // Update the metrics
+
+ // Update the metrics
this.numEntries.incrementAndGet();
writeSize.inc(len);
// sync txn to file system
start = System.currentTimeMillis();
this.sync(info.isMetaRegion(), txid);
-
+
// Update the metrics and log down the outliers
long end = System.currentTimeMillis();
long syncTime = end - start;
@@ -1077,7 +1076,7 @@ public class HLog implements Syncable {
Thread.currentThread().getName(), syncTime, this.numEntries.get(),
StringUtils.humanReadableInt(len)));
}
-
+
// Update the per-request profiling data
Call call = HRegionServer.callContext.get();
ProfilingData pData = call == null ? null : call.getProfilingData();
@@ -1165,7 +1164,7 @@ public class HLog implements Syncable {
} catch (InterruptedException e) {
LOG.debug(getName() + " interrupted while waiting for sync requests");
if (unflushedEntries.get() != syncTillHere) {
- syncFailureAbortStrategy.abort("LogSyncer interrupted before it" +
+ syncFailureAbortStrategy.abort("LogSyncer interrupted before it" +
" could sync everything. Aborting JVM", e);
}
} finally {
@@ -1248,7 +1247,7 @@ public class HLog implements Syncable {
syncFailureAbortStrategy.abort(hlogName + "Could not sync hlog. Aborting", e);
}
}
-
+
// if the number of replicas in HDFS has fallen below the initial
// value, then roll logs.
try {
@@ -1265,7 +1264,7 @@ public class HLog implements Syncable {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
" still proceeding ahead...");
}
-
+
try {
if (isUnderReplication || (this.writer.getLength() > this.logrollsize)) {
requestLogRoll();
@@ -1275,7 +1274,7 @@ public class HLog implements Syncable {
}
}
}
-
+
/**
* This method gets the datanode replication count for the current HLog.
*
@@ -1298,7 +1297,7 @@ public class HLog implements Syncable {
boolean canGetCurReplicas() {
return this.getNumCurrentReplicas != null;
}
-
+
private void requestLogRoll() {
if (this.listener != null) {
this.listener.logRollRequested();
@@ -1309,7 +1308,7 @@ public class HLog implements Syncable {
throws IOException {
this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
}
-
+
/** @return How many items have been added to the log */
int getNumEntries() {
return numEntries.get();
@@ -1458,7 +1457,7 @@ public class HLog implements Syncable {
files = NO_FILES;
}
for(FileStatus file : files) {
- Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+ Path newPath = getHLogArchivePath(oldLogDir, file.getPath(), fs, conf);
LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
FSUtils.getPath(newPath));
fs.rename(file.getPath(), newPath);
@@ -1743,8 +1742,29 @@ public class HLog implements Syncable {
return pattern.matcher(filename).matches();
}
- static Path getHLogArchivePath(Path oldLogDir, Path p) {
- return new Path(oldLogDir, p.getName());
+ static Path getHLogArchivePath(Path oldLogDir, Path p, FileSystem fs,
+ Configuration conf) throws IOException {
+ String filename = p.getName();
+
+ // if subdirectories are disabled...
+ if (conf != null && !conf.getBoolean(
+ HConstants.HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE,
+ HConstants.HREGION_OLDLOGDIR_USE_SUBDIR_STRUCTURE_DEFAULT)) {
+ return new Path(oldLogDir, filename);
+ }
+
+ if (!validateHLogFilename(filename)) {
+ LOG.warn("Malformed Log file name: " + filename);
+ return new Path(oldLogDir, filename);
+ }
+
+ // since the filename is a valid name, we know there
+ // is a last '.' (won't return -1)
+ String subDirectoryName = filename.substring(0, filename.lastIndexOf('.'));
+ Path oldLogsSubDir = new Path(oldLogDir, subDirectoryName);
+ fs.mkdirs(oldLogsSubDir);
+
+ return new Path(oldLogsSubDir, filename);
}
/**
@@ -1979,7 +1999,7 @@ public class HLog implements Syncable {
}
for (Path p: processedLogs) {
- Path newPath = getHLogArchivePath(oldLogDir, p);
+ Path newPath = getHLogArchivePath(oldLogDir, p, fs, conf);
if (fs.exists(p)) {
if (!fs.rename(p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath);
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Dec 17 19:19:43 2012
@@ -82,7 +82,7 @@ public class TestHLogSplit {
private static final int NUM_WRITERS = 10;
private static final int ENTRIES = 10; // entries per writer per region
private static final int NUM_CLOSE_THREADS = 10;
-
+
private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
private long seq = 0;
private static final byte[] TABLE_NAME = "t1".getBytes();
@@ -174,6 +174,7 @@ public class TestHLogSplit {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+ originalLog = (fs.listStatus(originalLog))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog));
@@ -359,6 +360,7 @@ public class TestHLogSplit {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
+ archivedLogs = fs.listStatus(archivedLogs[0].getPath());
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@@ -730,6 +732,8 @@ public class TestHLogSplit {
logfile.getPath().toString(), conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+ originalLog = (fs.listStatus(originalLog))[0].getPath();
+
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
assertEquals(true, logsAreEqual(originalLog, splitLog));
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java?rev=1423101&r1=1423100&r2=1423101&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestMutationWriteToWAL.java Mon Dec 17 19:19:43 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTru
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -151,8 +152,12 @@ public class TestMutationWriteToWAL exte
final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
int nLogFilesRead = 0;
List<String> actualLogEntries = new ArrayList<String>();
- for (FileStatus logFile : fs.listStatus(oldLogDir)) {
+ ArrayDeque<FileStatus> checkQueue = new ArrayDeque<FileStatus>(
+ java.util.Arrays.asList(fs.listStatus(oldLogDir)));
+ while (!checkQueue.isEmpty()) {
+ FileStatus logFile = checkQueue.pop();
if (logFile.isDir()) {
+ checkQueue.addAll(java.util.Arrays.asList(fs.listStatus(logFile.getPath())));
continue;
}
HLog.Reader r = HLog.getReader(fs, logFile.getPath(), conf);