You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/05/09 07:49:46 UTC
svn commit: r773167 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/filter/
src/java/org/apache/hadoop/hbase/regionserver/
src/test/org/apache/hadoop/hbase/filter/
Author: stack
Date: Sat May 9 05:49:46 2009
New Revision: 773167
URL: http://svn.apache.org/viewvc?rev=773167&view=rev
Log:
HBASE-1393 Narrow synchronization in HLog
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/filter/DisabledTestRegExpRowFilter.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat May 9 05:49:46 2009
@@ -215,6 +215,7 @@
HBASE-1392 change how we build/configure lzocodec (Ryan Rawson via Stack)
HBASE-1397 Better distribution in the PerformanceEvaluation MapReduce
when rows run to the Billions
+ HBASE-1393 Narrow synchronization in HLog
OPTIMIZATIONS
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HConstants.java Sat May 9 05:49:46 2009
@@ -118,8 +118,8 @@
static final String HBASE_DIR = "hbase.rootdir";
/** Used to construct the name of the log directory for a region server
- * Use '@' as a special character to seperate the log files from table data */
- static final String HREGION_LOGDIR_NAME = "@LOGS@";
+ * Use '.' as a special character to seperate the log files from table data */
+ static final String HREGION_LOGDIR_NAME = ".logs";
/** Name of old log file for reconstruction */
static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/filter/RegExpRowFilter.java Sat May 9 05:49:46 2009
@@ -168,7 +168,7 @@
}
}
if (nullColumns.contains(colKey)) {
- if (data != null && !HLogEdit.isDeleted(data)) {
+ if (data != null /* DELETE IS IN KEY NOW && !HLogEdit.isDeleted(data)*/) {
return true;
}
}
@@ -216,7 +216,7 @@
public boolean filterRow(final SortedMap<byte [], Cell> columns) {
for (Entry<byte [], Cell> col : columns.entrySet()) {
if (nullColumns.contains(col.getKey())
- && !HLogEdit.isDeleted(col.getValue().getValue())) {
+ /* DELETE IS IN KEY NOW && !HLogEdit.isDeleted(col.getValue().getValue())*/) {
return true;
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat May 9 05:49:46 2009
@@ -28,6 +28,9 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -92,17 +95,16 @@
private static final String HLOG_DATFILE = "hlog.dat.";
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
static final byte [] METAROW = Bytes.toBytes("METAROW");
- final FileSystem fs;
- final Path dir;
- final Configuration conf;
- final LogRollListener listener;
+ private final FileSystem fs;
+ private final Path dir;
+ private final Configuration conf;
+ private final LogRollListener listener;
private final int maxlogentries;
private final long optionalFlushInterval;
private final long blocksize;
private final int flushlogentries;
- private volatile int unflushedEntries = 0;
+ private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private volatile long lastLogFlushTime;
- final long threadWakeFrequency;
/*
* Current log file.
@@ -112,24 +114,23 @@
/*
* Map of all log files but the current one.
*/
- final SortedMap<Long, Path> outputfiles =
+ final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
/*
* Map of region to last sequence/edit id.
*/
- private final Map<byte [], Long> lastSeqWritten = Collections.
- synchronizedSortedMap(new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR));
+ private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
+ new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
private volatile boolean closed = false;
- private final Object sequenceLock = new Object();
- private volatile long logSeqNum = 0;
+ private final AtomicLong logSeqNum = new AtomicLong(0);
private volatile long filenum = 0;
private volatile long old_filenum = -1;
- private volatile int numEntries = 0;
+ private final AtomicInteger numEntries = new AtomicInteger(0);
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
@@ -175,7 +176,6 @@
conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
- this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
@@ -211,15 +211,12 @@
* @param newvalue We'll set log edit/sequence number to this value if it
* is greater than the current value.
*/
- void setSequenceNumber(long newvalue) {
- synchronized (sequenceLock) {
- if (newvalue > logSeqNum) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("changing sequence number from " + logSeqNum + " to " +
- newvalue);
- }
- logSeqNum = newvalue;
- }
+ void setSequenceNumber(final long newvalue) {
+ for (long id = this.logSeqNum.get(); id < newvalue &&
+ !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
+ // This could spin on occasion but better the occasional spin than locking
+ // every increment of sequence number.
+ LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
}
}
@@ -227,7 +224,7 @@
* @return log sequence number
*/
public long getSequenceNumber() {
- return logSeqNum;
+ return logSeqNum.get();
}
/**
@@ -290,7 +287,7 @@
regionToFlush = cleanOldLogs();
}
}
- this.numEntries = 0;
+ this.numEntries.set(0);
updateLock.notifyAll();
}
} finally {
@@ -380,9 +377,7 @@
}
oldFile = computeFilename(old_filenum);
if (filenum > 0) {
- synchronized (this.sequenceLock) {
- this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), oldFile);
- }
+ this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
}
}
return oldFile;
@@ -435,6 +430,53 @@
}
}
+
+ /** Append an entry without a row to the log.
+ *
+ * @param regionInfo
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, HLogEdit logEdit)
+ throws IOException {
+ this.append(regionInfo, new byte[0], logEdit);
+ }
+
+ /** Append an entry to the log.
+ *
+ * @param regionInfo
+ * @param row
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
+ throws IOException {
+ if (this.closed) {
+ throw new IOException("Cannot append; log is closed");
+ }
+ byte [] regionName = regionInfo.getRegionName();
+ byte [] tableName = regionInfo.getTableDesc().getName();
+ synchronized (updateLock) {
+ long seqNum = obtainSeqNum();
+ // The 'lastSeqWritten' map holds the sequence number of the oldest
+ // write for each region. When the cache is flushed, the entry for the
+ // region being flushed is removed if the sequence number of the flush
+ // is greater than or equal to the value in lastSeqWritten.
+ this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
+ HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
+ boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
+ doWrite(logKey, logEdit, sync);
+ this.numEntries.incrementAndGet();
+ updateLock.notifyAll();
+ }
+
+ if (this.numEntries.get() > this.maxlogentries) {
+ if (listener != null) {
+ listener.logRollRequested();
+ }
+ }
+ }
+
/**
* Append a set of edits to the log. Log edits are keyed by regionName,
* rowname, and log-sequence-id.
@@ -461,44 +503,41 @@
void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
boolean sync)
throws IOException {
- if (closed) {
+ if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
- synchronized (updateLock) {
- long seqNum[] = obtainSeqNum(edits.size());
+ long seqNum [] = obtainSeqNum(edits.size());
+ synchronized (this.updateLock) {
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
- if (!this.lastSeqWritten.containsKey(regionName)) {
- this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
- }
+ this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
int counter = 0;
for (KeyValue kv: edits) {
- HLogKey logKey =
- new HLogKey(regionName, tableName, seqNum[counter++]);
- doWrite(logKey, new HLogEdit(kv), sync);
-
- this.numEntries++;
+ HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++]);
+ doWrite(logKey, new HLogEdit(kv), sync);
+ this.numEntries.incrementAndGet();
}
updateLock.notifyAll();
}
- if (this.numEntries > this.maxlogentries) {
+ if (this.numEntries.get() > this.maxlogentries) {
requestLogRoll();
}
}
-
+
public void sync() throws IOException {
lastLogFlushTime = System.currentTimeMillis();
this.writer.sync();
- unflushedEntries = 0;
+ this.unflushedEntries.set(0);
}
void optionalSync() {
if (!this.closed) {
+ long now = System.currentTimeMillis();
synchronized (updateLock) {
- if (((System.currentTimeMillis() - this.optionalFlushInterval) >
- this.lastLogFlushTime) && this.unflushedEntries > 0) {
+ if (((now - this.optionalFlushInterval) >
+ this.lastLogFlushTime) && this.unflushedEntries.get() > 0) {
try {
sync();
} catch (IOException e) {
@@ -519,7 +558,7 @@
throws IOException {
try {
this.writer.append(logKey, logEdit);
- if (sync || ++unflushedEntries >= flushlogentries) {
+ if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) {
sync();
}
} catch (IOException e) {
@@ -528,69 +567,17 @@
throw e;
}
}
-
- /** Append an entry without a row to the log.
- *
- * @param regionInfo
- * @param logEdit
- * @throws IOException
- */
- public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
- this.append(regionInfo, new byte[0], logEdit);
- }
-
- /** Append an entry to the log.
- *
- * @param regionInfo
- * @param row
- * @param logEdit
- * @throws IOException
- */
- public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
- throws IOException {
- if (closed) {
- throw new IOException("Cannot append; log is closed");
- }
- byte [] regionName = regionInfo.getRegionName();
- byte [] tableName = regionInfo.getTableDesc().getName();
- synchronized (updateLock) {
- long seqNum = obtainSeqNum();
- // The 'lastSeqWritten' map holds the sequence number of the oldest
- // write for each region. When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
- // is greater than or equal to the value in lastSeqWritten.
- if (!this.lastSeqWritten.containsKey(regionName)) {
- this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
- }
-
- HLogKey logKey = new HLogKey(regionName, tableName, seqNum);
- boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion();
- doWrite(logKey, logEdit, sync);
- this.numEntries++;
- updateLock.notifyAll();
- }
-
- if (this.numEntries > this.maxlogentries) {
- if (listener != null) {
- listener.logRollRequested();
- }
- }
- }
/** @return How many items have been added to the log */
int getNumEntries() {
- return numEntries;
+ return numEntries.get();
}
/**
* Obtain a log sequence number.
*/
private long obtainSeqNum() {
- long value;
- synchronized (sequenceLock) {
- value = logSeqNum++;
- }
- return value;
+ return this.logSeqNum.incrementAndGet();
}
/** @return the number of log files in use */
@@ -598,18 +585,16 @@
return outputfiles.size();
}
- /**
+ /*
* Obtain a specified number of sequence numbers
*
* @param num number of sequence numbers to obtain
* @return array of sequence numbers
*/
- private long[] obtainSeqNum(int num) {
- long[] results = new long[num];
- synchronized (this.sequenceLock) {
- for (int i = 0; i < num; i++) {
- results[i] = this.logSeqNum++;
- }
+ private long [] obtainSeqNum(int num) {
+ long [] results = new long[num];
+ for (int i = 0; i < num; i++) {
+ results[i] = this.logSeqNum.incrementAndGet();
}
return results;
}
@@ -651,7 +636,7 @@
synchronized (updateLock) {
this.writer.append(new HLogKey(regionName, tableName, logSeqId),
completeCacheFlushLogEdit());
- this.numEntries++;
+ this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
this.lastSeqWritten.remove(regionName);
@@ -664,7 +649,6 @@
}
private HLogEdit completeCacheFlushLogEdit() {
- // TODO Profligacy!!! Fix all this creation.
return new HLogEdit(new KeyValue(METAROW, METACOLUMN,
System.currentTimeMillis(), HLogEdit.COMPLETE_CACHE_FLUSH));
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java Sat May 9 05:49:46 2009
@@ -37,14 +37,11 @@
* TODO: Remove. Just output KVs.
*/
public class HLogEdit implements Writable, HConstants {
- /** Value stored for a deleted item */
- public static byte [] DELETED_BYTES;
- /** Value written to HLog on a complete cache flush */
- public static byte [] COMPLETE_CACHE_FLUSH;
-
+ /** Value written to HLog on a complete cache flush. TODO: Remove. Not used.
+ */
+ static byte [] COMPLETE_CACHE_FLUSH;
static {
try {
- DELETED_BYTES = "HBASE::DELETEVAL".getBytes(UTF8_ENCODING);
COMPLETE_CACHE_FLUSH = "HBASE::CACHEFLUSH".getBytes(UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
assert(false);
@@ -183,23 +180,4 @@
operation = TransactionalOperation.valueOf(in.readUTF());
}
}
-
- /**
- * @param value
- * @return True if an entry and its content is {@link #DELETED_BYTES}.
- */
- public static boolean isDeleted(final byte [] value) {
- return isDeleted(value, 0, value.length);
- }
-
- /**
- * @param value
- * @return True if an entry and its content is {@link #DELETED_BYTES}.
- */
- public static boolean isDeleted(final byte [] value, final int offset,
- final int length) {
- return (value == null)? false:
- Bytes.BYTES_RAWCOMPARATOR.compare(DELETED_BYTES, 0, DELETED_BYTES.length,
- value, offset, length) == 0;
- }
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 9 05:49:46 2009
@@ -1325,12 +1325,7 @@
checkColumn(column);
KeyValue kv = null;
if (op.isPut()) {
- byte [] val = op.getValue();
- if (HLogEdit.isDeleted(val)) {
- throw new IOException("Cannot insert value: " +
- Bytes.toString(val));
- }
- kv = new KeyValue(row, column, commitTime, val);
+ kv = new KeyValue(row, column, commitTime, op.getValue());
} else {
// Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {
@@ -1420,12 +1415,7 @@
byte [] column = op.getColumn();
KeyValue kv = null;
if (op.isPut()) {
- byte [] val = op.getValue();
- if (HLogEdit.isDeleted(val)) {
- throw new IOException("Cannot insert value: " +
- Bytes.toString(val));
- }
- kv = new KeyValue(row, column, commitTime, val);
+ kv = new KeyValue(row, column, commitTime, op.getValue());
} else {
// Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat May 9 05:49:46 2009
@@ -504,7 +504,12 @@
tries++;
} else {
LOG.error("Exceeded max retries: " + this.numRetries, e);
- checkFileSystem();
+ if (checkFileSystem()) {
+ // Filesystem is OK. Something is up w/ ZK or master. Sleep
+ // a little while if only to stop our logging many times a
+ // millisecond.
+ Thread.sleep(1000);
+ }
}
if (this.stopRequested.get()) {
LOG.info("Stop was requested, clearing the toDo " +
@@ -1708,7 +1713,7 @@
}
}
- public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
+ public int batchUpdates(final byte[] regionName, final BatchUpdate [] b)
throws IOException {
int i = 0;
checkOpen();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat May 9 05:49:46 2009
@@ -306,7 +306,7 @@
skippedEdits++;
continue;
}
- // Check this edit is for me. Also, guard against writing the speical
+ // Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
KeyValue kv = val.getKeyValue();
if (val.isTransactionEntry() ||
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/filter/DisabledTestRegExpRowFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/filter/DisabledTestRegExpRowFilter.java?rev=773167&r1=773166&r2=773167&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/filter/DisabledTestRegExpRowFilter.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/filter/DisabledTestRegExpRowFilter.java Sat May 9 05:49:46 2009
@@ -188,10 +188,10 @@
// Try a row that has all expected columnKeys and a null-expected columnKey
// that maps to a null value.
// Testing row with columnKeys: a-e, e maps to null
- colvalues.put(new byte [] {LAST_CHAR},
- new Cell(HLogEdit.DELETED_BYTES, HConstants.LATEST_TIMESTAMP));
- assertFalse("Failed with last columnKey " + LAST_CHAR + " mapping to null.",
- filter.filterRow(colvalues));
+// colvalues.put(new byte [] {LAST_CHAR},
+// new Cell(HLogEdit.DELETED_BYTES, HConstants.LATEST_TIMESTAMP));
+// assertFalse("Failed with last columnKey " + LAST_CHAR + " mapping to null.",
+// filter.filterRow(colvalues));
}
private byte [] createRow(final char c) {