You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:13:08 UTC
svn commit: r1181490 -
/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Author: nspiegelberg
Date: Tue Oct 11 02:13:07 2011
New Revision: 1181490
URL: http://svn.apache.org/viewvc?rev=1181490&view=rev
Log:
TXID-based Sync's
Summary:
Trying to improve put latency for prod cluster by skipping
redundant sync requests.
Test Plan:
- mvn test -Dtest=Test*Log*
DiffCamp Revision: 215411
Reviewed By: kannan
Reviewers: dhruba, kannan
Commenters: dhruba
CC: nspiegelberg, dhruba, kannan
Revert Plan:
OK
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181490&r1=1181489&r2=1181490&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 02:13:07 2011
@@ -143,8 +143,9 @@ public class HLog implements Syncable {
private final long blocksize;
private final int flushlogentries;
private final String prefix;
- private final AtomicInteger unflushedEntries = new AtomicInteger(0);
+ private final AtomicLong unflushedEntries = new AtomicLong(0);
private final Path oldLogDir;
+ private volatile long syncTillHere = 0;
private final List<LogActionsListener> actionListeners =
Collections.synchronizedList(new ArrayList<LogActionsListener>());
@@ -158,7 +159,7 @@ public class HLog implements Syncable {
final static Object [] NO_ARGS = new Object []{};
// used to indirectly tell syncFs to force the sync
- private boolean forceSync = false;
+ private volatile boolean forceSync = false;
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
@@ -860,6 +861,7 @@ public class HLog implements Syncable {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
+ long txid = 0;
long start = System.currentTimeMillis();
synchronized (this.updateLock) {
long seqNum = obtainSeqNum();
@@ -874,14 +876,14 @@ public class HLog implements Syncable {
this.numEntries.incrementAndGet();
// Only count 1 row as an unflushed entry.
- this.unflushedEntries.incrementAndGet();
+ txid = this.unflushedEntries.incrementAndGet();
}
writeTime += System.currentTimeMillis() - start;
writeOps++;
// sync txn to file system
start = System.currentTimeMillis();
- this.sync(info.isMetaRegion());
+ this.sync(info.isMetaRegion(), txid);
gsyncTime += System.currentTimeMillis() - start;
gsyncOps++;
@@ -904,6 +906,7 @@ public class HLog implements Syncable {
private final long optionalFlushInterval;
+ // this variable is protected by this.lock
private boolean syncerShuttingDown = false;
LogSyncer(long optionalFlushInterval) {
@@ -953,31 +956,38 @@ public class HLog implements Syncable {
* This method first signals the thread that there's a sync needed
* and then waits for it to happen before returning.
*/
- public void addToSyncQueue(boolean force) {
+ public void addToSyncQueue(boolean force, long txid) {
- // Don't bother if somehow our append was already hflushed
- if (unflushedEntries.get() == 0) {
- return;
- }
- lock.lock();
- try {
- if (syncerShuttingDown) {
- LOG.warn(getName() + " was shut down while waiting for sync");
+ while (true) {
+ // Don't bother if somehow our append was already hflushed
+ // Check this without even acquiring the lock, in the hope
+ // that our edits is already synced.
+ if (!force && syncTillHere >= txid) {
return;
}
- if(force) {
+ if (force) {
forceSync = true;
+ force = false;
}
- // Wake the thread
- queueEmpty.signal();
+ lock.lock();
+ try {
+ if (syncerShuttingDown) {
+ LOG.warn(getName() + " was shut down while waiting for sync");
+ return;
+ }
+ // Wake the thread
+ queueEmpty.signal();
- // Wait for it to hflush
- syncDone.await();
- } catch (InterruptedException e) {
- LOG.debug(getName() + " was interrupted while waiting for sync", e);
- }
- finally {
- lock.unlock();
+ // Wait for it to hflush
+ // This check is redone here because it is within the lock
+ if (syncTillHere < txid) {
+ syncDone.await();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(getName() + " was interrupted while waiting for sync", e);
+ } finally {
+ lock.unlock();
+ }
}
}
}
@@ -992,7 +1002,17 @@ public class HLog implements Syncable {
* @param force For catalog regions, force the sync to happen
*/
public void sync(boolean force) {
- logSyncerThread.addToSyncQueue(force);
+ sync(force, this.unflushedEntries.get());
+ }
+
+ /**
+ * This method calls the LogSyncer only if its transaction is not yet flushed.
+ *
+ * @param txid
+ * The transaction id that this call is interested in.
+ */
+ public void sync(boolean force, long txid) {
+ logSyncerThread.addToSyncQueue(force, txid);
}
public void hflush() throws IOException {
@@ -1002,14 +1022,15 @@ public class HLog implements Syncable {
}
boolean logRollRequested = false;
if (this.forceSync ||
- this.unflushedEntries.get() >= this.flushlogentries) {
+ this.unflushedEntries.get() - this.syncTillHere >= this.flushlogentries) {
try {
long now = System.currentTimeMillis();
+ long doneUpto = this.unflushedEntries.get();
this.writer.sync();
+ this.syncTillHere = doneUpto;
syncTime += System.currentTimeMillis() - now;
syncOps++;
this.forceSync = false;
- this.unflushedEntries.set(0);
// if the number of replicas in HDFS has fallen below the initial
// value, then roll logs.