You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/05/16 08:10:45 UTC
svn commit: r775418 - in /hadoop/hbase/trunk: ./ conf/
src/java/org/apache/hadoop/hbase/master/
src/java/org/apache/hadoop/hbase/regionserver/
src/java/org/apache/hadoop/hbase/regionserver/tableindexed/
src/java/org/apache/hadoop/hbase/regionserver/tra...
Author: stack
Date: Sat May 16 06:10:44 2009
New Revision: 775418
URL: http://svn.apache.org/viewvc?rev=775418&view=rev
Log:
HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on HLog#append?)
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/conf/hbase-default.xml
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat May 16 06:10:44 2009
@@ -249,6 +249,8 @@
HBASE-1424 have shell print regioninfo and location on first load if
DEBUG enabled
HBASE-1008 [performance] The replay of logs on server crash takes way too long
+ HBASE-1394 Uploads sometimes fall to 0 requests/second (Binding up on
+ HLog#append?)
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue
Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Sat May 16 06:10:44 2009
@@ -175,14 +175,6 @@
</description>
</property>
<property>
- <name>hbase.regionserver.maxlogentries</name>
- <value>100000</value>
- <description>Rotate the HRegion HLogs when count of entries exceeds this
- value. Default: 100,000. Value is checked by a thread that runs every
- hbase.server.thread.wakefrequency.
- </description>
- </property>
- <property>
<name>hbase.regionserver.flushlogentries</name>
<value>100</value>
<description>Sync the HLog to the HDFS when it has accumulated this many
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java Sat May 16 06:10:44 2009
@@ -267,7 +267,6 @@
this.rootdir, this.conf);
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
this.rootdir, this.conf);
-
// Add first region from the META table to the ROOT region.
HRegion.addRegionToMETA(root, meta);
root.close();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 16 06:10:44 2009
@@ -104,7 +104,6 @@
private final Path dir;
private final Configuration conf;
private final LogRollListener listener;
- private final int maxlogentries;
private final long optionalFlushInterval;
private final long blocksize;
private final int flushlogentries;
@@ -132,11 +131,17 @@
private final AtomicLong logSeqNum = new AtomicLong(0);
- private volatile long filenum = 0;
+ private volatile long filenum = -1;
private volatile long old_filenum = -1;
private final AtomicInteger numEntries = new AtomicInteger(0);
+ // Size of edits written so far. Used figuring when to rotate logs.
+ private final AtomicLong editsSize = new AtomicLong(0);
+
+ // If > than this size, roll the log.
+ private final long logrollsize;
+
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock();
@@ -144,7 +149,9 @@
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
private final Object updateLock = new Object();
-
+
+ private final boolean enabled;
+
/*
* If more than this many logs, force flush of oldest region to oldest edit
* goes to disk. If too many and we crash, then will take forever replaying.
@@ -182,12 +189,13 @@
this.dir = dir;
this.conf = conf;
this.listener = listener;
- this.maxlogentries =
- conf.getInt("hbase.regionserver.maxlogentries", 100000);
this.flushlogentries =
conf.getInt("hbase.regionserver.flushlogentries", 100);
this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
+ // Roll at 95% of block size.
+ float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
+ this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
@@ -196,15 +204,16 @@
}
fs.mkdirs(dir);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 64);
+ this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
LOG.info("HLog configuration: blocksize=" + this.blocksize +
- ", maxlogentries=" + this.maxlogentries + ", flushlogentries=" +
- this.flushlogentries + ", optionallogflushinternal=" +
- this.optionalFlushInterval + "ms");
+ ", rollsize=" + this.logrollsize +
+ ", enabled=" + this.enabled +
+ ", flushlogentries=" + this.flushlogentries +
+ ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
}
/**
- * Accessor for tests. Not a part of the public API.
* @return Current state of the monotonically increasing file id.
*/
public long getFilenum() {
@@ -212,17 +221,13 @@
}
/**
- * Get the compression type for the hlog files.
- * Commit logs SHOULD NOT be compressed. You'll lose edits if the compression
- * record is not complete. In gzip, record is 32k so you could lose up to
- * 32k of edits (All of this is moot till we have sync/flush in hdfs but
- * still...).
+ * Get the compression type for the hlog files
* @param c Configuration to use.
* @return the kind of compression to use
*/
private static CompressionType getCompressionType(final Configuration c) {
- String name = c.get("hbase.io.seqfile.compression.type");
- return name == null? CompressionType.NONE: CompressionType.valueOf(name);
+ // Compression makes no sense for commit log. Always return NONE.
+ return CompressionType.NONE;
}
/**
@@ -277,23 +282,24 @@
}
synchronized (updateLock) {
// Clean up current writer.
- Path oldFile = cleanupCurrentWriter();
- // Create a new one.
- this.old_filenum = this.filenum;
+ Path oldFile = cleanupCurrentWriter(this.filenum);
+ if (this.filenum >= 0) {
+ this.old_filenum = this.filenum;
+ }
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
-
this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
HLogKey.class, KeyValue.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
fs.getDefaultReplication(), this.blocksize,
SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
new Metadata());
-
LOG.info((oldFile != null?
- "Closed " + oldFile + ", entries=" + this.numEntries.get() + ". ": "") +
- "New log writer: " + FSUtils.getPath(newPath));
-
+ "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
+ this.numEntries.get() +
+ ", calcsize=" + this.editsSize.get() + ", filesize=" +
+ this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
+ "New hlog " + FSUtils.getPath(newPath));
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.size() <= 0) {
@@ -310,6 +316,7 @@
}
}
this.numEntries.set(0);
+ this.editsSize.set(0);
updateLock.notifyAll();
}
} finally {
@@ -337,7 +344,7 @@
if (LOG.isDebugEnabled()) {
// Find region associated with oldest key -- helps debugging.
oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
- LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+ LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " +
" out of total " + this.outputfiles.size() + "; " +
"oldest outstanding seqnum is " + oldestOutstandingSeqNum +
" from region " + Bytes.toString(oldestRegion));
@@ -351,7 +358,7 @@
if (countOfLogs > this.maxLogs) {
regionToFlush = oldestRegion != null?
oldestRegion: getOldestRegion(oldestOutstandingSeqNum);
- LOG.info("Too many logs: logs=" + countOfLogs + ", maxlogs=" +
+ LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" +
this.maxLogs + "; forcing flush of region with oldest edits: " +
Bytes.toString(regionToFlush));
}
@@ -382,7 +389,8 @@
* @return Path to current writer or null if none.
* @throws IOException
*/
- private Path cleanupCurrentWriter() throws IOException {
+ private Path cleanupCurrentWriter(final long currentfilenum)
+ throws IOException {
Path oldFile = null;
if (this.writer != null) {
// Close the current writer, get a new one.
@@ -393,12 +401,12 @@
// shut ourselves down to minimize loss. Alternative is to try and
// keep going. See HBASE-930.
FailedLogCloseException flce =
- new FailedLogCloseException("#" + this.filenum);
+ new FailedLogCloseException("#" + currentfilenum);
flce.initCause(e);
throw e;
}
- oldFile = computeFilename(old_filenum);
- if (filenum > 0) {
+ if (currentfilenum >= 0) {
+ oldFile = computeFilename(currentfilenum);
this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
@@ -406,7 +414,7 @@
}
private void deleteLogFile(final Path p, final Long seqno) throws IOException {
- LOG.info("removing old log file " + FSUtils.getPath(p) +
+ LOG.info("removing old hlog file " + FSUtils.getPath(p) +
" whose highest sequence/edit id is " + seqno);
this.fs.delete(p, true);
}
@@ -418,6 +426,7 @@
* @return Path
*/
public Path computeFilename(final long fn) {
+ if (fn < 0) return null;
return new Path(dir, HLOG_DATFILE + fn);
}
@@ -442,7 +451,7 @@
synchronized (updateLock) {
this.closed = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("closing log writer in " + this.dir.toString());
+ LOG.debug("closing hlog writer in " + this.dir.toString());
}
this.writer.close();
updateLock.notifyAll();
@@ -457,11 +466,12 @@
*
* @param regionInfo
* @param logEdit
+ * @param now
* @throws IOException
*/
- public void append(HRegionInfo regionInfo, KeyValue logEdit)
+ public void append(HRegionInfo regionInfo, KeyValue logEdit, final long now)
throws IOException {
- this.append(regionInfo, new byte[0], logEdit);
+ this.append(regionInfo, new byte[0], logEdit, now);
}
/** Append an entry to the log.
@@ -469,9 +479,11 @@
* @param regionInfo
* @param row
* @param logEdit
+ * @param now Time of this edit write.
* @throws IOException
*/
- public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit)
+ public void append(HRegionInfo regionInfo, byte [] row, KeyValue logEdit,
+ final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@@ -485,14 +497,13 @@
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
- HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
+ HLogKey logKey = new HLogKey(regionName, tableName, seqNum, now);
boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
- doWrite(logKey, logEdit, sync);
+ doWrite(logKey, logEdit, sync, now);
this.numEntries.incrementAndGet();
updateLock.notifyAll();
}
-
- if (this.numEntries.get() > this.maxlogentries) {
+ if (this.editsSize.get() > this.logrollsize) {
if (listener != null) {
listener.logRollRequested();
}
@@ -520,10 +531,11 @@
* @param tableName
* @param edits
* @param sync
+ * @param now
* @throws IOException
*/
void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
- boolean sync)
+ boolean sync, final long now)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@@ -537,13 +549,14 @@
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
int counter = 0;
for (KeyValue kv: edits) {
- HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
- doWrite(logKey, kv, sync);
+ HLogKey logKey =
+ new HLogKey(regionName, tableName, seqNum[counter++], now);
+ doWrite(logKey, kv, sync, now);
this.numEntries.incrementAndGet();
}
updateLock.notifyAll();
}
- if (this.numEntries.get() > this.maxlogentries) {
+ if (this.editsSize.get() > this.logrollsize) {
requestLogRoll();
}
}
@@ -558,19 +571,19 @@
if (!this.closed) {
long now = System.currentTimeMillis();
synchronized (updateLock) {
- if (((now - this.optionalFlushInterval) >
- this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
+ if (((now - this.optionalFlushInterval) > this.lastLogFlushTime) &&
+ this.unflushedEntries.get() > 0) {
try {
sync();
} catch (IOException e) {
- LOG.error("Error flushing HLog", e);
+ LOG.error("Error flushing hlog", e);
}
}
}
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
- "ms optional sync'ing HLog; editcount=" + this.numEntries.get());
+ "ms optional sync'ing hlog; editcount=" + this.numEntries.get());
}
}
}
@@ -581,10 +594,14 @@
}
}
- private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync)
+ private void doWrite(HLogKey logKey, KeyValue logEdit, boolean sync,
+ final long now)
throws IOException {
+ if (!this.enabled) {
+ return;
+ }
try {
- long now = System.currentTimeMillis();
+ this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize());
this.writer.append(logKey, logEdit);
if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
sync();
@@ -592,10 +609,10 @@
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
- "ms appending an edit to HLog; editcount=" + this.numEntries.get());
+ "ms appending an edit to hlog; editcount=" + this.numEntries.get());
}
} catch (IOException e) {
- LOG.fatal("Could not append. Requesting close of log", e);
+ LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
throw e;
}
@@ -667,8 +684,8 @@
return;
}
synchronized (updateLock) {
- this.writer.append(new HLogKey(regionName, tableName, logSeqId),
- completeCacheFlushLogEdit());
+ this.writer.append(new HLogKey(regionName, tableName, logSeqId,
+ System.currentTimeMillis()), completeCacheFlushLogEdit());
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
@@ -729,7 +746,7 @@
// Nothing to do
return;
}
- LOG.info("Splitting " + logfiles.length + " log(s) in " +
+ LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
splitLog(rootDir, logfiles, fs, conf);
try {
@@ -741,7 +758,7 @@
throw io;
}
long endMillis = System.currentTimeMillis();
- LOG.info("log file splitting completed in " + (endMillis - millis) +
+ LOG.info("hlog file splitting completed in " + (endMillis - millis) +
" millis for " + srcDir.toString());
}
@@ -762,8 +779,8 @@
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
- logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
+ ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
// Check for possibly empty file. With appends, currently Hadoop reports
// a zero length even if the file has been sync'd. Revisit if
@@ -777,7 +794,7 @@
try {
int count = 0;
while (in.next(key, val)) {
- byte[] regionName = key.getRegionName();
+ byte [] regionName = key.getRegionName();
LinkedList<HLogEntry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLogEntry>();
@@ -787,7 +804,8 @@
queue.push(new HLogEntry(val, key));
count++;
}
- LOG.debug("Pushed " + count + " entries");
+ LOG.debug("Pushed " + count + " entries from " +
+ logfiles[i].getPath());
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
@@ -797,7 +815,7 @@
}
} catch (IOException e) {
if (length <= 0) {
- LOG.warn("Empty log, continuing: " + logfiles[i]);
+ LOG.warn("Empty hlog, continuing: " + logfiles[i]);
continue;
}
throw e;
@@ -838,7 +856,7 @@
Path oldlogfile = null;
SequenceFile.Reader old = null;
if (fs.exists(logfile)) {
- LOG.warn("Old log file " + logfile
+ LOG.warn("Old hlog file " + logfile
+ " already exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
@@ -852,7 +870,7 @@
// iterate.
logWriters.put(key, w);
if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new log file writer for path "
+ LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toString(key));
}
@@ -893,10 +911,10 @@
// Wait for all threads to terminate
try {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
- LOG.debug("Waiting for log writers to terminate, iteration #" + i);
+ LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
}catch(InterruptedException ex) {
- LOG.warn("Log writers were interrupted, possible data loss!");
+ LOG.warn("Hlog writers were interrupted, possible data loss!");
}
} finally {
for (SequenceFile.Writer w : logWriters.values()) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sat May 16 06:10:44 2009
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
@@ -34,14 +36,18 @@
* <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
* associated row.
*/
-public class HLogKey implements WritableComparable<HLogKey> {
+public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
private byte [] regionName;
private byte [] tablename;
private long logSeqNum;
+ // Time at which this edit was written.
+ private long writeTime;
+ private int HEAP_TAX = HeapSize.OBJECT + (2 * HeapSize.BYTE_ARRAY) +
+ (2 * HeapSize.LONG);
- /** Create an empty key useful when deserializing */
+ /** Writable Consructor -- Do not use. */
public HLogKey() {
- this(null, null, 0L);
+ this(null, null, 0L, HConstants.LATEST_TIMESTAMP);
}
/**
@@ -52,12 +58,14 @@
* @param regionName - name of region
* @param tablename - name of table
* @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
*/
public HLogKey(final byte [] regionName, final byte [] tablename,
- long logSeqNum) {
+ long logSeqNum, final long now) {
this.regionName = regionName;
this.tablename = tablename;
this.logSeqNum = logSeqNum;
+ this.writeTime = now;
}
//////////////////////////////////////////////////////////////////////////////
@@ -78,7 +86,11 @@
public long getLogSeqNum() {
return logSeqNum;
}
-
+
+ public long getWriteTime() {
+ return this.writeTime;
+ }
+
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -100,38 +112,44 @@
public int hashCode() {
int result = this.regionName.hashCode();
result ^= this.logSeqNum;
+ result ^= this.writeTime;
return result;
}
- //
- // Comparable
- //
-
public int compareTo(HLogKey o) {
int result = Bytes.compareTo(this.regionName, o.regionName);
- if(result == 0) {
+ if (result == 0) {
if (this.logSeqNum < o.logSeqNum) {
result = -1;
} else if (this.logSeqNum > o.logSeqNum) {
result = 1;
}
+ if (result == 0) {
+ if (this.writeTime < o.writeTime) {
+ result = -1;
+ } else if (this.writeTime > o.writeTime) {
+ return 1;
+ }
+ }
}
return result;
}
- //
- // Writable
- //
-
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.regionName);
Bytes.writeByteArray(out, this.tablename);
out.writeLong(logSeqNum);
+ out.writeLong(this.writeTime);
}
public void readFields(DataInput in) throws IOException {
this.regionName = Bytes.readByteArray(in);
this.tablename = Bytes.readByteArray(in);
this.logSeqNum = in.readLong();
+ this.writeTime = in.readLong();
+ }
+
+ public long heapSize() {
+ return this.regionName.length + this.tablename.length + HEAP_TAX;
}
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 16 06:10:44 2009
@@ -1341,7 +1341,7 @@
edits.add(kv);
}
if (!edits.isEmpty()) {
- update(edits, writeToWAL);
+ update(edits, writeToWAL, now);
}
if (latestTimestampDeletes != null &&
!latestTimestampDeletes.isEmpty()) {
@@ -1349,7 +1349,7 @@
// as edits. Need to do individually after figuring which is latest
// timestamp to delete.
for (byte [] column: latestTimestampDeletes) {
- deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+ deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
}
}
} finally {
@@ -1387,6 +1387,7 @@
splitsAndClosesLock.readLock().lock();
try {
byte[] row = b.getRow();
+ long now = System.currentTimeMillis();
Integer lid = getLock(lockid,row);
try {
NavigableSet<byte []> keySet =
@@ -1404,7 +1405,7 @@
}
if (success) {
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
- System.currentTimeMillis(): b.getTimestamp();
+ now: b.getTimestamp();
Set<byte []> latestTimestampDeletes = null;
List<KeyValue> edits = new ArrayList<KeyValue>();
for (BatchOperation op: b) {
@@ -1431,7 +1432,7 @@
edits.add(kv);
}
if (!edits.isEmpty()) {
- update(edits, writeToWAL);
+ update(edits, writeToWAL, now);
}
if (latestTimestampDeletes != null &&
!latestTimestampDeletes.isEmpty()) {
@@ -1439,7 +1440,7 @@
// as edits. Need to do individually after figuring which is latest
// timestamp to delete.
for (byte [] column: latestTimestampDeletes) {
- deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+ deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
}
}
}
@@ -1530,7 +1531,7 @@
try {
// Delete ALL versions rather than column family VERSIONS. If we just did
// VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
- deleteMultiple(row, column, ts, ALL_VERSIONS);
+ deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1547,9 +1548,10 @@
throws IOException {
checkReadOnly();
Integer lid = getLock(lockid, row);
+ long now = System.currentTimeMillis();
long time = ts;
if (ts == HConstants.LATEST_TIMESTAMP) {
- time = System.currentTimeMillis();
+ time = now;
}
KeyValue kv = KeyValue.createFirstOnRow(row, time);
try {
@@ -1561,7 +1563,7 @@
// This is UGLY. COPY OF KEY PART OF KeyValue.
edits.add(key.cloneDelete());
}
- update(edits);
+ update(edits, now);
}
} finally {
if (lockid == null) releaseRowLock(lid);
@@ -1594,7 +1596,7 @@
for (KeyValue key: keyvalues) {
edits.add(key.cloneDelete());
}
- update(edits);
+ update(edits, now);
}
} finally {
if(lockid == null) releaseRowLock(lid);
@@ -1629,7 +1631,7 @@
for (KeyValue kv: keyvalues) {
edits.add(kv.cloneDelete());
}
- update(edits);
+ update(edits, now);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1668,7 +1670,7 @@
for (KeyValue k: keyvalues) {
edits.add(k.cloneDelete());
}
- update(edits);
+ update(edits, now);
}
} finally {
if(lockid == null) releaseRowLock(lid);
@@ -1684,10 +1686,11 @@
* @param ts Timestamp to start search on.
* @param versions How many versions to delete. Pass
* {@link HConstants#ALL_VERSIONS} to delete all.
+ * @param now
* @throws IOException
*/
private void deleteMultiple(final byte [] row, final byte [] column,
- final long ts, final int versions)
+ final long ts, final int versions, final long now)
throws IOException {
checkReadOnly();
// We used to have a getKeys method that purportedly only got the keys and
@@ -1704,7 +1707,7 @@
for (KeyValue key: keys) {
edits.add(key.cloneDelete());
}
- update(edits);
+ update(edits, now);
}
}
@@ -1748,10 +1751,12 @@
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
+ * @praram now
* @throws IOException
*/
- private void update(final List<KeyValue> edits) throws IOException {
- this.update(edits, true);
+ private void update(final List<KeyValue> edits, final long now)
+ throws IOException {
+ this.update(edits, true, now);
}
/**
@@ -1759,9 +1764,11 @@
* Warning: Assumption is caller has lock on passed in row.
* @param writeToWAL if true, then we should write to the log
* @param updatesByColumn Cell updates by column
+ * @param now
* @throws IOException
*/
- private void update(final List<KeyValue> edits, boolean writeToWAL)
+ private void update(final List<KeyValue> edits, boolean writeToWAL,
+ final long now)
throws IOException {
if (edits == null || edits.isEmpty()) {
return;
@@ -1772,7 +1779,7 @@
if (writeToWAL) {
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), edits,
- (regionInfo.isMetaRegion() || regionInfo.isRootRegion()));
+ (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
}
long size = 0;
for (KeyValue kv: edits) {
@@ -2273,7 +2280,7 @@
List<KeyValue> edits = new ArrayList<KeyValue>();
edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
Writables.getBytes(r.getRegionInfo())));
- meta.update(edits);
+ meta.update(edits, System.currentTimeMillis());
} finally {
meta.releaseRowLock(lid);
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat May 16 06:10:44 2009
@@ -205,9 +205,9 @@
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
- protected volatile HLog log;
- LogRoller logRoller;
- LogFlusher logFlusher;
+ protected volatile HLog hlog;
+ LogRoller hlogRoller;
+ LogFlusher hlogFlusher;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
@@ -344,10 +344,10 @@
this.compactSplitThread = new CompactSplitThread(this);
// Log rolling thread
- this.logRoller = new LogRoller(this);
+ this.hlogRoller = new LogRoller(this);
// Log flushing thread
- this.logFlusher =
+ this.hlogFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested);
// Background thread to check for major compactions; needed if region
@@ -513,14 +513,14 @@
if (checkFileSystem()) {
closeAllRegions();
try {
- log.closeAndDelete();
+ hlog.closeAndDelete();
} catch (Exception e) {
LOG.error("error closing and deleting HLog", e);
}
try {
serverInfo.setStartCode(System.currentTimeMillis());
- log = setupHLog();
- this.logFlusher.setHLog(log);
+ hlog = setupHLog();
+ this.hlogFlusher.setHLog(hlog);
} catch (IOException e) {
this.abortRequested = true;
this.stopRequested.set(true);
@@ -620,17 +620,17 @@
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptIfNecessary();
- logFlusher.interrupt();
+ hlogFlusher.interrupt();
compactSplitThread.interruptIfNecessary();
- logRoller.interruptIfNecessary();
+ hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
if (abortRequested) {
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
- if (this.log != null) {
- this.log.close();
+ if (this.hlog != null) {
+ this.hlog.close();
LOG.info("On abort, closed hlog");
}
} catch (Throwable e) {
@@ -644,7 +644,7 @@
} else {
ArrayList<HRegion> closedRegions = closeAllRegions();
try {
- log.closeAndDelete();
+ hlog.closeAndDelete();
} catch (Throwable e) {
LOG.error("Close and delete failed",
RemoteExceptionHandler.checkThrowable(e));
@@ -743,8 +743,8 @@
this.hdfsShutdownThread = suppressHdfsShutdownHook();
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
- this.log = setupHLog();
- this.logFlusher.setHLog(log);
+ this.hlog = setupHLog();
+ this.hlogFlusher.setHLog(hlog);
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
@@ -1058,7 +1058,7 @@
"running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists");
}
- HLog newlog = new HLog(fs, logdir, conf, logRoller);
+ HLog newlog = new HLog(fs, logdir, conf, hlogRoller);
return newlog;
}
@@ -1127,9 +1127,9 @@
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
- Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
+ Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
handler);
- Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher",
+ Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
@@ -1199,7 +1199,7 @@
}
// Verify that all threads are alive
if (!(leases.isAlive() && compactSplitThread.isAlive() &&
- cacheFlusher.isAlive() && logRoller.isAlive() &&
+ cacheFlusher.isAlive() && hlogRoller.isAlive() &&
workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
// One or more threads are no longer alive - shut down
stop();
@@ -1234,7 +1234,7 @@
/** @return the HLog */
HLog getLog() {
- return this.log;
+ return this.hlog;
}
/**
@@ -1270,7 +1270,7 @@
Threads.shutdown(this.workerThread);
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.compactSplitThread);
- Threads.shutdown(this.logRoller);
+ Threads.shutdown(this.hlogRoller);
}
private boolean getMaster() {
@@ -1540,7 +1540,7 @@
}
this.lock.writeLock().lock();
try {
- this.log.setSequenceNumber(region.getMinSequenceId());
+ this.hlog.setSequenceNumber(region.getMinSequenceId());
this.onlineRegions.put(mapKey, region);
} finally {
this.lock.writeLock().unlock();
@@ -1552,7 +1552,7 @@
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
- .getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
+ .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
r.initialize(null, new Progressable() {
public void progress() {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java Sat May 16 06:10:44 2009
@@ -61,7 +61,7 @@
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
- .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
r.initialize(null, new Progressable() {
public void progress() {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java Sat May 16 06:10:44 2009
@@ -100,7 +100,7 @@
HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, TransactionalOperation.START);
*/
- hlog.append(regionInfo, null/*logEdit*/);
+ hlog.append(regionInfo, null/*logEdit*/, System.currentTimeMillis());
}
/**
@@ -117,7 +117,7 @@
for (BatchOperation op : update) {
// COMMENTED OUT HLogEdit logEdit = new HLogEdit(transactionId, update.getRow(), op, commitTime);
- hlog.append(regionInfo, update.getRow(), null /*logEdit*/);
+ hlog.append(regionInfo, update.getRow(), null /*logEdit*/, System.currentTimeMillis());
}
}
@@ -130,7 +130,7 @@
logEdit = new HLogEdit(transactionId,
HLogEdit.TransactionalOperation.COMMIT);
*/
- hlog.append(regionInfo, null /*logEdit*/);
+ hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
}
/**
@@ -141,7 +141,7 @@
/*HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
*/
- hlog.append(regionInfo, null /*logEdit*/);
+ hlog.append(regionInfo, null /*logEdit*/, System.currentTimeMillis());
}
/**
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Sat May 16 06:10:44 2009
@@ -111,7 +111,7 @@
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
- .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
r.initialize(null, new Progressable() {
public void progress() {
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=775418&r1=775417&r2=775418&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sat May 16 06:10:44 2009
@@ -77,7 +77,8 @@
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
column));
- log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, false);
+ log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit,
+ false, System.currentTimeMillis());
}
}
log.rollWriter();
@@ -110,7 +111,7 @@
cols.add(new KeyValue(row, Bytes.toBytes("column:" + Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
- log.append(regionName, tableName, cols, false);
+ log.append(regionName, tableName, cols, false, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();