You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/09/25 21:13:51 UTC
svn commit: r579353 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt
src/java/org/apache/hadoop/hbase/HLog.java
src/java/org/apache/hadoop/hbase/HRegion.java
src/test/org/apache/hadoop/hbase/TestLogRolling.java
Author: jimk
Date: Tue Sep 25 12:13:50 2007
New Revision: 579353
URL: http://svn.apache.org/viewvc?rev=579353&view=rev
Log:
HADOOP-1943 LogRolling test fails: reverting changes for HADOOP-1820
Removed:
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Sep 25 12:13:50 2007
@@ -45,6 +45,7 @@
HADOOP-1813 OOME makes zombie of region server
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1820 Regionserver creates hlogs without bound
+ (reverted 2007/09/25)
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables
HADOOP-1834 Scanners ignore timestamp passed on creation
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Tue Sep 25 12:13:50 2007
@@ -29,9 +29,6 @@
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/**
* HLog stores all the edits to the HStore.
@@ -56,11 +53,6 @@
* older (smaller) than the most-recent CACHEFLUSH message for every HRegion
* that has a message in F.
*
- * <p>synchronized methods can never execute in parallel. However, between the
- * start of a cache flush and the completion point, appends are allowed but log
- * rolling is not. To prevent log rolling taking place during this period, a
- * separate reentrant lock is used.
- *
* <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
* in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
* The 'atomic' write to the log is meant to serve as insurance against
@@ -82,21 +74,20 @@
SequenceFile.Writer writer;
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
- HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
+ volatile boolean insideCacheFlush = false;
+
+ TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>();
volatile boolean closed = false;
- AtomicLong logSeqNum = new AtomicLong(0);
- volatile long filenum = 0;
+ volatile long logSeqNum = 0;
+ long filenum = 0;
AtomicInteger numEntries = new AtomicInteger(0);
- // This lock prevents starting a log roll during a cache flush.
- // synchronized is insufficient because a cache flush spans two method calls.
- private final Lock cacheFlushLock = new ReentrantLock();
+ Integer rollLock = new Integer(0);
/**
* Split up a bunch of log files, that are no longer being written to,
- * into new files, one per region. Delete the old log files when finished.
- *
+ * into new files, one per region. Delete the old log files when ready.
* @param rootDir Root directory of the HBase instance
* @param srcDir Directory of log files to split:
* e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
@@ -189,105 +180,109 @@
fs.mkdirs(dir);
rollWriter();
}
-
- /**
- * Called by HRegionServer when it opens a new region to ensure that log
- * sequence numbers are always greater than the latest sequence number of
- * the region being brought on-line.
- *
- * @param newvalue
- */
+
synchronized void setSequenceNumber(long newvalue) {
- if (newvalue > logSeqNum.get()) {
+ if (newvalue > logSeqNum) {
if (LOG.isDebugEnabled()) {
LOG.debug("changing sequence number from " + logSeqNum + " to " +
newvalue);
}
- logSeqNum.set(newvalue);
+ logSeqNum = newvalue;
}
}
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
- * Because a log cannot be rolled during a cache flush, and a cache flush
- * spans two method calls, a special lock needs to be obtained so that a
- * cache flush cannot start when the log is being rolled and the log cannot
- * be rolled during a cache flush.
- *
- * Note that this method cannot be synchronized because it is possible that
- * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
- * start which would obtain the lock on this but block on obtaining the
- * cacheFlushLock and then completeCacheFlush could be called which would
- * wait for the lock on this and consequently never release the cacheFlushLock
+ *
+ * The 'rollLock' prevents us from entering rollWriter() more than
+ * once at a time.
+ *
+ * The 'this' lock limits access to the current writer so
+ * we don't append multiple items simultaneously.
*
* @throws IOException
*/
void rollWriter() throws IOException {
- if(closed) {
- throw new IOException("Cannot roll log; log is closed");
- }
+ synchronized(rollLock) {
- cacheFlushLock.lock(); // prevent cache flushes
- try {
- // Now that we have locked out cache flushes, lock this to prevent other
- // changes.
+ // Try to roll the writer to a new file. We may have to
+ // wait for a cache-flush to complete. In the process,
+ // compute a list of old log files that can be deleted.
+
+ Vector<Path> toDeleteList = new Vector<Path>();
+ synchronized(this) {
+ if(closed) {
+ throw new IOException("Cannot roll log; log is closed");
+ }
- synchronized (this) {
- if (writer != null) { // Close the current writer (if any), get a new one.
+ // Make sure we do not roll the log while inside a
+ // cache-flush. Otherwise, the log sequence number for
+ // the CACHEFLUSH operation will appear in a "newer" log file
+ // than it should.
+ while(insideCacheFlush) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ // continue;
+ }
+ }
+
+ // Close the current writer (if any), and grab a new one.
+ if(writer != null) {
writer.close();
Path p = computeFilename(filenum - 1);
if(LOG.isDebugEnabled()) {
LOG.debug("Closing current log writer " + p.toString() +
- " to get a new one");
+ " to get a new one");
}
if (filenum > 0) {
- outputfiles.put(logSeqNum.get() - 1, p);
+ outputfiles.put(logSeqNum - 1, p);
}
}
Path newPath = computeFilename(filenum++);
- this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class,
- HLogEdit.class);
-
- if (LOG.isDebugEnabled()) {
+ this.writer = SequenceFile.createWriter(fs, conf, newPath,
+ HLogKey.class, HLogEdit.class);
+ if(LOG.isDebugEnabled()) {
LOG.debug("new log writer created at " + newPath);
}
-
+
// Can we delete any of the old log files?
// First, compute the oldest relevant log operation
// over all the regions.
long oldestOutstandingSeqNum = Long.MAX_VALUE;
- for (Long l: lastSeqWritten.values()) {
+ for(Long l: regionToLastFlush.values()) {
long curSeqNum = l.longValue();
-
- if (curSeqNum < oldestOutstandingSeqNum) {
+
+ if(curSeqNum < oldestOutstandingSeqNum) {
oldestOutstandingSeqNum = curSeqNum;
}
}
- // Get the set of all sequence numbers that are older than the oldest
- // pending region operation
-
- TreeSet<Long> sequenceNumbers = new TreeSet<Long>();
- sequenceNumbers.addAll(
- outputfiles.headMap(oldestOutstandingSeqNum).keySet());
-
- // Remove all files with a final ID that's older than the oldest
- // pending region-operation.
-
- for (Long seq: sequenceNumbers) {
- Path p = outputfiles.remove(seq);
- if(LOG.isDebugEnabled()) {
- LOG.debug("removing old log file " + p.toString());
+ // Next, remove all files with a final ID that's older
+ // than the oldest pending region-operation.
+ for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) {
+ long maxSeqNum = it.next().longValue();
+ if(maxSeqNum < oldestOutstandingSeqNum) {
+ Path p = outputfiles.get(maxSeqNum);
+ it.remove();
+ toDeleteList.add(p);
+
+ } else {
+ break;
}
- fs.delete(p);
}
- this.numEntries.set(0);
}
-
- } finally {
- cacheFlushLock.unlock();
+
+ // Actually delete them, if any!
+ for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) {
+ Path p = it.next();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("removing old log file " + p.toString());
+ }
+ fs.delete(p);
+ }
+ this.numEntries.set(0);
}
}
@@ -333,9 +328,7 @@
* other systems should process the log appropriately upon each startup
* (and prior to initializing HLog).
*
- * synchronized prevents appends during the completion of a cache flush or
- * for the duration of a log roll.
- *
+ * We need to seize a lock on the writer so that writes are atomic.
* @param regionName
* @param tableName
* @param row
@@ -344,19 +337,21 @@
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
- TreeMap<Text, byte []> columns, long timestamp) throws IOException {
+ TreeMap<Text, byte []> columns, long timestamp)
+ throws IOException {
if(closed) {
throw new IOException("Cannot append; log is closed");
}
-
- long seqNum[] = obtainSeqNum(columns.size());
- // The 'lastSeqWritten' map holds the sequence number of the most recent
- // write for each region. When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
- // is greater than or equal to the value in lastSeqWritten
-
- lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
+ long seqNum[] = obtainSeqNum(columns.size());
+
+ // The 'regionToLastFlush' map holds the sequence id of the
+ // most recent flush for every regionName. However, for regions
+ // that don't have any flush yet, the relevant operation is the
+ // first one that's been added.
+ if (regionToLastFlush.get(regionName) == null) {
+ regionToLastFlush.put(regionName, seqNum[0]);
+ }
int counter = 0;
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
@@ -368,39 +363,29 @@
}
}
- /**
- * @return How many items have been added to the log
- *
- * Because numEntries is an AtomicInteger, no locking is required.
- */
+ /** @return How many items have been added to the log */
int getNumEntries() {
return numEntries.get();
}
/**
- * Obtain a log sequence number.
- *
- * Because it is only called from a synchronized method, no additional locking
- * is required.
+ * Obtain a log sequence number. This seizes the whole HLog
+ * lock, but it shouldn't last too long.
*/
- private long obtainSeqNum() {
- return logSeqNum.getAndIncrement();
+ synchronized long obtainSeqNum() {
+ return logSeqNum++;
}
/**
* Obtain a specified number of sequence numbers
*
- * Because it is only called from a synchronized method, no additional locking
- * is required.
- *
* @param num - number of sequence numbers to obtain
* @return - array of sequence numbers
*/
- private long[] obtainSeqNum(int num) {
- long sequenceNumber = logSeqNum.getAndAdd(num);
+ synchronized long[] obtainSeqNum(int num) {
long[] results = new long[num];
for (int i = 0; i < num; i++) {
- results[i] = sequenceNumber++;
+ results[i] = logSeqNum++;
}
return results;
}
@@ -409,50 +394,54 @@
* By acquiring a log sequence ID, we can allow log messages
* to continue while we flush the cache.
*
- * Acquire a lock so that we do not roll the log between the start
- * and completion of a cache-flush. Otherwise the log-seq-id for
+ * Set a flag so that we do not roll the log between the start
+ * and complete of a cache-flush. Otherwise the log-seq-id for
* the flush will not appear in the correct logfile.
- *
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
* @see #abortCacheFlush()
*/
synchronized long startCacheFlush() {
- cacheFlushLock.lock();
+ while (this.insideCacheFlush) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ // continue
+ }
+ }
+ this.insideCacheFlush = true;
+ notifyAll();
return obtainSeqNum();
}
- /**
- * Complete the cache flush
- *
- * Protected by this.lock()
- *
+ /** Complete the cache flush
* @param regionName
* @param tableName
* @param logSeqId
* @throws IOException
*/
synchronized void completeCacheFlush(final Text regionName,
- final Text tableName, final long logSeqId) throws IOException {
-
- try {
- if(this.closed) {
- return;
- }
-
- writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
- new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
- System.currentTimeMillis()));
-
- numEntries.getAndIncrement();
- Long seq = lastSeqWritten.get(regionName);
- if (seq != null && logSeqId >= seq) {
- lastSeqWritten.remove(regionName);
- }
-
- } finally {
- cacheFlushLock.unlock();
+ final Text tableName, final long logSeqId)
+ throws IOException {
+ if(this.closed) {
+ return;
}
+
+ if (!this.insideCacheFlush) {
+ throw new IOException("Impossible situation: inside " +
+ "completeCacheFlush(), but 'insideCacheFlush' flag is false");
+ }
+ HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
+ this.writer.append(key,
+ new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+ System.currentTimeMillis()));
+ this.numEntries.getAndIncrement();
+
+ // Remember the most-recent flush for each region.
+ // This is used to delete obsolete log files.
+ this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
+
+ cleanup();
}
/**
@@ -462,8 +451,23 @@
* is a restart of the regionserver so the snapshot content dropped by the
* failure gets restored to the memcache.
*/
- void abortCacheFlush() {
- this.cacheFlushLock.unlock();
+ synchronized void abortCacheFlush() {
+ cleanup();
+ }
+
+ private synchronized void cleanup() {
+ this.insideCacheFlush = false;
+ notifyAll();
+ }
+
+ /**
+ * Abort a cache flush.
+ * This method will clear waits on {@link #insideCacheFlush} but if this
+ * method is called, we are losing data. TODO: Fix.
+ */
+ synchronized void abort() {
+ this.insideCacheFlush = false;
+ notifyAll();
}
private static void usage() {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=579353&r1=579352&r2=579353&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue Sep 25 12:13:50 2007
@@ -210,7 +210,6 @@
final int memcacheFlushSize;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
- protected final int optionalFlushCount;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
private final long maxSequenceId;
@@ -248,8 +247,6 @@
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.optionalFlushCount =
- conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -731,13 +728,11 @@
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false);
- } else if (this.memcache.getSize() > 0) {
- if (this.noFlushCount >= this.optionalFlushCount) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- flushcache(false);
-
- } else {
+ } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
+ LOG.info("Optional flush called " + this.noFlushCount +
+ " times when data present without flushing. Forcing one.");
+ flushcache(false);
+ if (this.memcache.getSize() > 0) {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
@@ -869,31 +864,25 @@
retval.memcacheSnapshot.size());
}
- try {
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
- for (HStore hstore: stores.values()) {
- hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
- }
- } catch (IOException e) {
- // An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memcache.
- // Currently, only a server restart will do this.
- this.log.abortCacheFlush();
- throw new DroppedSnapshotException(e.getMessage());
+ // A. Flush memcache to all the HStores.
+ // Keep running vector of all store files that includes both old and the
+ // just-made new flush store file.
+ for (HStore hstore: stores.values()) {
+ hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
}
- // If we get to here, the HStores have been written. If we get an
- // error in completeCacheFlush it will release the lock it is holding
-
// B. Write a FLUSHCACHE-COMPLETE message to the log.
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
-
+ regionInfo.tableDesc.getName(), logCacheFlushId);
+ } catch (IOException e) {
+ // An exception here means that the snapshot was not persisted.
+ // The hlog needs to be replayed so its content is restored to memcache.
+ // Currently, only a server restart will do this.
+ this.log.abortCacheFlush();
+ throw new DroppedSnapshotException(e.getMessage());
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores or, if error, clear aborted snapshot.