You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:13:08 UTC

svn commit: r1181490 - /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Author: nspiegelberg
Date: Tue Oct 11 02:13:07 2011
New Revision: 1181490

URL: http://svn.apache.org/viewvc?rev=1181490&view=rev
Log:
TXID-based Sync's

Summary:
Trying to improve put latency for prod cluster by skipping
redundant sync requests.

Test Plan:
- mvn test -Dtest=Test*Log*

DiffCamp Revision: 215411
Reviewed By: kannan
Reviewers: dhruba, kannan
Commenters: dhruba
CC: nspiegelberg, dhruba, kannan
Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181490&r1=1181489&r2=1181490&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 02:13:07 2011
@@ -143,8 +143,9 @@ public class HLog implements Syncable {
   private final long blocksize;
   private final int flushlogentries;
   private final String prefix;
-  private final AtomicInteger unflushedEntries = new AtomicInteger(0);
+  private final AtomicLong unflushedEntries = new AtomicLong(0);
   private final Path oldLogDir;
+  private volatile long syncTillHere = 0;
   private final List<LogActionsListener> actionListeners =
       Collections.synchronizedList(new ArrayList<LogActionsListener>());
 
@@ -158,7 +159,7 @@ public class HLog implements Syncable {
   final static Object [] NO_ARGS = new Object []{};
 
   // used to indirectly tell syncFs to force the sync
-  private boolean forceSync = false;
+  private volatile boolean forceSync = false;
 
   public interface Reader {
     void init(FileSystem fs, Path path, Configuration c) throws IOException;
@@ -860,6 +861,7 @@ public class HLog implements Syncable {
     if (this.closed) {
       throw new IOException("Cannot append; log is closed");
     }
+    long txid = 0;
     long start = System.currentTimeMillis();
     synchronized (this.updateLock) {
       long seqNum = obtainSeqNum();
@@ -874,14 +876,14 @@ public class HLog implements Syncable {
       this.numEntries.incrementAndGet();
 
       // Only count 1 row as an unflushed entry.
-      this.unflushedEntries.incrementAndGet();
+      txid = this.unflushedEntries.incrementAndGet();
     }
     writeTime += System.currentTimeMillis() - start;
     writeOps++;
 
     // sync txn to file system
     start = System.currentTimeMillis();
-    this.sync(info.isMetaRegion());
+    this.sync(info.isMetaRegion(), txid);
     gsyncTime += System.currentTimeMillis() - start;
     gsyncOps++;
 
@@ -904,6 +906,7 @@ public class HLog implements Syncable {
 
     private final long optionalFlushInterval;
 
+    // this variable is protected by this.lock
     private boolean syncerShuttingDown = false;
 
     LogSyncer(long optionalFlushInterval) {
@@ -953,31 +956,38 @@ public class HLog implements Syncable {
      * This method first signals the thread that there's a sync needed
      * and then waits for it to happen before returning.
      */
-    public void addToSyncQueue(boolean force) {
+    public void addToSyncQueue(boolean force, long txid) {
 
-      // Don't bother if somehow our append was already hflushed
-      if (unflushedEntries.get() == 0) {
-        return;
-      }
-      lock.lock();
-      try {
-        if (syncerShuttingDown) {
-          LOG.warn(getName() + " was shut down while waiting for sync");
+      while (true) {
+        // Don't bother if somehow our append was already hflushed
+        // Check this without even acquiring the lock, in the hope
+        // that our edits is already synced.
+        if (!force && syncTillHere >= txid) {
           return;
         }
-        if(force) {
+        if (force) {
           forceSync = true;
+          force = false;
         }
-        // Wake the thread
-        queueEmpty.signal();
+        lock.lock();
+        try {
+          if (syncerShuttingDown) {
+            LOG.warn(getName() + " was shut down while waiting for sync");
+            return;
+          }
+          // Wake the thread
+          queueEmpty.signal();
 
-        // Wait for it to hflush
-        syncDone.await();
-      } catch (InterruptedException e) {
-        LOG.debug(getName() + " was interrupted while waiting for sync", e);
-      }
-      finally {
-        lock.unlock();
+          // Wait for it to hflush
+          // This check is redone here because it is within the lock
+          if (syncTillHere < txid) {
+            syncDone.await();
+          }
+        } catch (InterruptedException e) {
+          LOG.debug(getName() + " was interrupted while waiting for sync", e);
+        } finally {
+          lock.unlock();
+        }
       }
     }
   }
@@ -992,7 +1002,17 @@ public class HLog implements Syncable {
    * @param force For catalog regions, force the sync to happen
    */
   public void sync(boolean force) {
-    logSyncerThread.addToSyncQueue(force);
+    sync(force, this.unflushedEntries.get());
+  }
+
+  /**
+   * This method calls the LogSyncer only if its transaction is not yet flushed.
+   *
+   * @param txid
+   *          The transaction id that this call is interested in.
+   */
+  public void sync(boolean force, long txid) {
+    logSyncerThread.addToSyncQueue(force, txid);
   }
 
   public void hflush() throws IOException {
@@ -1002,14 +1022,15 @@ public class HLog implements Syncable {
       }
       boolean logRollRequested = false;
       if (this.forceSync ||
-          this.unflushedEntries.get() >= this.flushlogentries) {
+          this.unflushedEntries.get() - this.syncTillHere >= this.flushlogentries) {
         try {
           long now = System.currentTimeMillis();
+          long doneUpto = this.unflushedEntries.get();
           this.writer.sync();
+          this.syncTillHere = doneUpto;
           syncTime += System.currentTimeMillis() - now;
           syncOps++;
           this.forceSync = false;
-          this.unflushedEntries.set(0);
 
           // if the number of replicas in HDFS has fallen below the initial
           // value, then roll logs.