You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/05/16 21:29:35 UTC
svn commit: r775515 -
/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Author: apurtell
Date: Sat May 16 19:29:34 2009
New Revision: 775515
URL: http://svn.apache.org/viewvc?rev=775515&view=rev
Log:
add missing hunk from last commit
Modified:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775515&r1=775514&r2=775515&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 16 19:29:34 2009
@@ -104,7 +104,6 @@
private final Path dir;
private final Configuration conf;
private final LogRollListener listener;
- private final int maxlogentries;
private final long optionalFlushInterval;
private final long blocksize;
private final int flushlogentries;
@@ -132,11 +131,17 @@
private final AtomicLong logSeqNum = new AtomicLong(0);
- private volatile long filenum = 0;
+ private volatile long filenum = -1;
private volatile long old_filenum = -1;
private final AtomicInteger numEntries = new AtomicInteger(0);
+ // Size of edits written so far. Used figuring when to rotate logs.
+ private final AtomicLong editsSize = new AtomicLong(0);
+
+ // If > than this size, roll the log.
+ private final long logrollsize;
+
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock();
@@ -144,7 +149,9 @@
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
private final Object updateLock = new Object();
-
+
+ private final boolean enabled;
+
/*
* If more than this many logs, force flush of oldest region to oldest edit
* goes to disk. If too many and we crash, then will take forever replaying.
@@ -182,12 +189,13 @@
this.dir = dir;
this.conf = conf;
this.listener = listener;
- this.maxlogentries =
- conf.getInt("hbase.regionserver.maxlogentries", 100000);
this.flushlogentries =
conf.getInt("hbase.regionserver.flushlogentries", 100);
this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
+ // Roll at 95% of block size.
+ float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+ this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
@@ -196,15 +204,16 @@
}
fs.mkdirs(dir);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
+ this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
LOG.info("HLog configuration: blocksize=" + this.blocksize +
- ", maxlogentries=" + this.maxlogentries + ", flushlogentries=" +
- this.flushlogentries + ", optionallogflushinternal=" +
- this.optionalFlushInterval + "ms");
+ ", rollsize=" + this.logrollsize +
+ ", enabled=" + this.enabled +
+ ", flushlogentries=" + this.flushlogentries +
+ ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
}
/**
- * Accessor for tests. Not a part of the public API.
* @return Current state of the monotonically increasing file id.
*/
public long getFilenum() {
@@ -212,17 +221,13 @@
}
/**
- * Get the compression type for the hlog files.
- * Commit logs SHOULD NOT be compressed. You'll lose edits if the compression
- * record is not complete. In gzip, record is 32k so you could lose up to
- * 32k of edits (All of this is moot till we have sync/flush in hdfs but
- * still...).
+ * Get the compression type for the hlog files
* @param c Configuration to use.
* @return the kind of compression to use
*/
private static CompressionType getCompressionType(final Configuration c) {
- String name = c.get("hbase.io.seqfile.compression.type");
- return name == null? CompressionType.NONE: CompressionType.valueOf(name);
+ // Compression makes no sense for commit log. Always return NONE.
+ return CompressionType.NONE;
}
/**
@@ -277,23 +282,24 @@
}
synchronized (updateLock) {
// Clean up current writer.
- Path oldFile = cleanupCurrentWriter();
- // Create a new one.
- this.old_filenum = this.filenum;
+ Path oldFile = cleanupCurrentWriter(this.filenum);
+ if (this.filenum >= 0) {
+ this.old_filenum = this.filenum;
+ }
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
-
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
HLogKey.class, KeyValue.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), this.blocksize,
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
new Metadata());
-
LOG.info((oldFile != null?
- "Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") +
- "New log writer: " + FSUtils.getPath(newPath));
-
+ "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+ this.numEntries.get() +
+ ", calcsize=" + this.editsSize.get() + ", filesize=" +
+ this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+ "New hlog " + FSUtils.getPath(newPath));
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.size() <= 0) {
@@ -310,6 +316,7 @@
}
}
this.numEntries.set(0);
+ this.editsSize.set(0);
updateLock.notifyAll();
}
} finally {
@@ -337,7 +344,7 @@
if (LOG.isDebugEnabled()) {
// Find region associated with oldest key -- helps debugging.
oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
- LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+ LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
" out of total " + this.outputfiles.size() + "; " +
"oldest outstanding seqnum is " + oldestOutstandingSeqNum +
" from region " + Bytes.toString(oldestRegion));
@@ -351,7 +358,7 @@
if (countOfLogs > this.maxLogs) {
regionToFlush = oldestRegion != null?
oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
- LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
+ LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
this.maxLogs + "; forcing flush of region with oldest edits: " +
Bytes.toString(regionToFlush));
}
@@ -382,7 +389,8 @@
* @return Path to current writer or null if none.
* @throws IOException
*/
- private Path cleanupCurrentWriter() throws IOException {
+ private Path cleanupCurrentWriter(final long currentfilenum)
+ throws IOException {
Path oldFile = null;
if (this.writer != null) {
// Close the current writer, get a new one.
@@ -393,12 +401,12 @@
// shut ourselves down to minimize loss. Alternative is to try and
// keep going. See HBASE-930.
FailedLogCloseException flce =
- new FailedLogCloseException("#" + this.filenum);
+ new FailedLogCloseException("#" + currentfilenum);
flce.initCause(e);
throw e;
}
- oldFile = computeFilename(old_filenum);
- if (filenum > 0) {
+ if (currentfilenum >= 0) {
+ oldFile = computeFilename(currentfilenum);
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
@@ -406,7 +414,7 @@
}
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
- LOG.info("removing old log file " + FSUtils.getPath(p) +
+ LOG.info("removing old hlog file " + FSUtils.getPath(p) +
" whose highest sequence/edit id is " + seqno);
this.fs.delete(p, true);
}
@@ -418,6 +426,7 @@
* @return Path
*/
public Path computeFilename(final long fn) {
+ if (fn < 0) return null;
return new Path(dir, HLOG_DATFILE + fn);
}
@@ -442,7 +451,7 @@
synchronized (updateLock) {
this.closed = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("closing log writer in " + this.dir.toString());
+ LOG.debug("closing hlog writer in " + this.dir.toString());
}
this.writer.close();
updateLock.notifyAll();
@@ -457,11 +466,12 @@
*
* @param regionInfo
* @param logEdit
+ * @param now
* @throws IOException
*/
- public void append(HRegionInfo regionInfo, KeyValue logEdit)
+ public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now)
throws IOException {
- this.append(regionInfo, new byte[0], logEdit);
+ this.append(regionInfo, new byte[0], logEdit, now);
}
/** Append an entry to the log.
@@ -469,9 +479,11 @@
* @param regionInfo
* @param row
* @param logEdit
+ * @param now Time of this edit write.
* @throws IOException
*/
- public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit)
+ public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit,
+ final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@@ -485,14 +497,13 @@
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
- HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
+ HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now);
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
- doWrite(logKey, logEdit, sync);
+ doWrite(logKey, logEdit, sync, now);
this.numEntries.incrementAndGet();
updateLock.notifyAll();
}
-
- if (this.numEntries.get() > this.maxlogentries) {
+ if (this.editsSize.get() > this.logrollsize) {
if (listener != null) {
listener.logRollRequested();
}
@@ -520,10 +531,11 @@
* @param tableName
* @param edits
* @param sync
+ * @param now
* @throws IOException
*/
void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
- boolean sync)
+ boolean sync, final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@@ -537,13 +549,14 @@
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
int counter = 0;
for (KeyValue kv: edits) {
- HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
- doWrite(logKey, kv, sync);
+ HLogKey logKey =
+ new HLogKey(regionName, tableName, seqNum[counter++], now);
+ doWrite(logKey, kv, sync, now);
this.numEntries.incrementAndGet();
}
updateLock.notifyAll();
}
- if (this.numEntries.get() > this.maxlogentries) {
+ if (this.editsSize.get() > this.logrollsize) {
requestLogRoll();
}
}
@@ -558,19 +571,19 @@
if (!this.closed) {
long now = System.currentTimeMillis();
synchronized (updateLock) {
- if (((now - this.optionalFlushInterval) >
- this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
+ if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
+ this.unflushedEntries.get() > 0) {
try {
sync();
} catch (IOException e) {
- LOG.error("Error flushing HLog", e);
+ LOG.error("Error flushing hlog", e);
}
}
}
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
- "ms optional sync'ing HLog; editcount=" + this.numEntries.get());
+ "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
}
}
}
@@ -581,10 +594,14 @@
}
}
- private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
+ private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync,
+ final long now)
throws IOException {
+ if (!this.enabled) {
+ return;
+ }
try {
- long now = System.currentTimeMillis();
+ this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
this.writer.append(logKey, logEdit);
if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
sync();
@@ -592,10 +609,10 @@
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
- "ms appending an edit to HLog; editcount=" + this.numEntries.get());
+ "ms appending an edit to hlog; editcount=" + this.numEntries.get());
}
} catch (IOException e) {
- LOG.fatal("Could not append. Requesting close of log", e);
+ LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
throw e;
}
@@ -667,8 +684,8 @@
return;
}
synchronized (updateLock) {
- this.writer.append(new HLogKey(regionName, tableName, logSeqId),
- completeCacheFlushLogEdit());
+ this.writer.append(new HLogKey(regionName, tableName, logSeqId,
+ System.currentTimeMillis()), completeCacheFlushLogEdit());
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
@@ -729,7 +746,7 @@
// Nothing to do
return;
}
- LOG.info("Splitting " + logfiles.length + " log(s) in " +
+ LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
splitLog(rootDir, logfiles, fs, conf);
try {
@@ -741,7 +758,7 @@
throw io;
}
long endMillis = System.currentTimeMillis();
- LOG.info("log file splitting completed in " + (endMillis - millis) +
+ LOG.info("hlog file splitting completed in " + (endMillis - millis) +
" millis for " + srcDir.toString());
}
@@ -762,8 +779,8 @@
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
- logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+ ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
// Check for possibly empty file. With appends, currently Hadoop reports
// a zero length even if the file has been sync'd. Revisit if
@@ -777,7 +794,7 @@
try {
int count = 0;
while (in.next(key, val)) {
- byte[] regionName = key.getRegionName();
+ byte [] regionName = key.getRegionName();
LinkedList<HLogEntry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLogEntry>();
@@ -787,7 +804,8 @@
queue.push(new HLogEntry(val, key));
count++;
}
- LOG.debug("Pushed " + count + " entries");
+ LOG.debug("Pushed " + count + " entries from " +
+ logfiles[i].getPath());
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
@@ -797,7 +815,7 @@
}
} catch (IOException e) {
if (length <= 0) {
- LOG.warn("Empty log, continuing: " + logfiles[i]);
+ LOG.warn("Empty hlog, continuing: " + logfiles[i]);
continue;
}
throw e;
@@ -838,7 +856,7 @@
Path oldlogfile = null;
SequenceFile.Reader old = null;
if (fs.exists(logfile)) {
- LOG.warn("Old log file " + logfile
+ LOG.warn("Old hlog file " + logfile
+ " already exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
@@ -852,7 +870,7 @@
// iterate.
logWriters.put(key, w);
if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new log file writer for path "
+ LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toString(key));
}
@@ -893,10 +911,10 @@
// Wait for all threads to terminate
try {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
- LOG.debug("Waiting for log writers to terminate, iteration #" + i);
+ LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
}catch(InterruptedException ex) {
- LOG.warn("Log writers were interrupted, possible data loss!");
+ LOG.warn("Hlog writers were interrupted, possible data loss!");
}
} finally {
for (SequenceFile.Writer w : logWriters.values()) {