You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/05 19:44:19 UTC

svn commit: r1405917 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Author: liyin
Date: Mon Nov  5 18:44:18 2012
New Revision: 1405917

URL: http://svn.apache.org/viewvc?rev=1405917&view=rev
Log:
[HBASE-6968 ] Improve HLog group commits throughput

Author: hkuang

Summary:
This is an initial diff that tries to allow log entry append while syncing log entries to HDFS. It introduces a double list buffer where
1. currentlist is for log entries to be appended
2. synclist contains log entries to be synced to hdfs.

With this, each put RPC only adds the log entry to the currentlist under appendlock, while log entry serialization, write, and sync are done in group with updatelock by the sync thread.

I still need to run benchmark and plan to add a metrics for group commit size.

Test Plan: existing unit tests

Reviewers: kannan, kranganathan, liyintang

Reviewed By: liyintang

CC: hbase-eng@, sdong, aaiyer, liyintang

Differential Revision: https://phabricator.fb.com/D558495

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1405917&r1=1405916&r2=1405917&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Mon Nov  5 18:44:18 2012
@@ -238,6 +238,9 @@ public class HLog implements Syncable {
   // during an update
   private final Object updateLock = new Object();
 
+  // Lock to guarantee the ordering of log entries in HLOG
+  private final Object appendLock = new Object();
+  
   private final boolean enabled;
 
   /*
@@ -329,6 +332,59 @@ public class HLog implements Syncable {
     return gsyncTime.get();
   }
 
+  /**
+   * Double list buffer for WAL that allows entries to be
+   * appended while sync is in progress
+   * 
+   * CurrentList is for buffering appended entries;
+   * syncList contains entries being synced to persistent storage;
+   */
+  private class DoubleListBuffer {
+    private LinkedList<Entry> currentList = new LinkedList<Entry>();
+    private LinkedList<Entry> syncList = new LinkedList<Entry>();
+    
+    /**
+     * Append a log entry into the buffer 
+     * @param entry log entry
+     */
+    synchronized private void appendToBuffer(Entry entry) {
+      currentList.add(entry);
+    }
+    
+    /**
+     * Sync buffered log entries into persistent storage
+     * 
+     * @return number of log entries synced
+     */
+    private int sync() throws IOException {
+      synchronized (this) {
+        if (currentList.isEmpty()) { // no thing to sync
+          return 0;
+        }
+
+        // otherwise swap the buffer in preparation for sync
+        assert syncList.isEmpty();
+        LinkedList<Entry> tmp = syncList;
+        syncList = currentList;
+        currentList = tmp;
+      }
+      
+      // append entries to writer
+      int syncedEntries = syncList.size();
+      while (!syncList.isEmpty()) {
+        Entry entry = syncList.remove();
+        append(entry);
+      }
+      
+      // sync the data
+      long now = System.currentTimeMillis();
+      writer.sync();
+      syncTime.inc(System.currentTimeMillis() - now);
+      return syncedEntries;
+    }
+  }
+  
+  private DoubleListBuffer logBuffer = new DoubleListBuffer();
 
   /**
    * HLog creating with a null actions listener.
@@ -908,6 +964,43 @@ public class HLog implements Syncable {
     return new HLogKey(regionName, tableName, seqnum, now);
   }
 
+
+  /** Append an entry to the log.
+   *
+   * @param regionInfo
+   * @param logEdit
+   * @param logKey
+   * @throws IOException
+   */
+  public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
+  throws IOException {
+    if (logSyncerThread.syncerShuttingDown) {
+      // can't acquire lock for the duration of append()
+      // so this is just a best-effort check
+      throw new IOException("Cannot append; logSyncer shutting down");
+    }
+    byte [] regionName = regionInfo.getRegionName();
+    synchronized (this.appendLock) {
+      if (this.closed) {
+        throw new IOException("Cannot append; log is closed");
+      }
+      long seqNum = obtainSeqNum();
+      logKey.setLogSeqNum(seqNum);
+      // The 'lastSeqWritten' map holds the sequence number of the oldest
+      // write for each region (i.e. the first edit added to the particular
+      // memstore). When the cache is flushed, the entry for the
+      // region being flushed is removed if the sequence number of the flush
+      // is greater than or equal to the value in lastSeqWritten.
+      this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
+      doWrite(regionInfo, logKey, logEdit);
+      this.unflushedEntries.incrementAndGet();
+      this.numEntries.incrementAndGet();
+    }
+
+    // sync txn to file system
+    this.sync(regionInfo.isMetaRegion());
+  }
+
   /**
    * Append a set of edits to the log. Log edits are keyed by regionName,
    * rowname, and log-sequence-id.
@@ -948,8 +1041,7 @@ public class HLog implements Syncable {
     
     long start = System.currentTimeMillis();
     byte[] regionName = info.getRegionName();
-    
-    synchronized (this.updateLock) {
+    synchronized (this.appendLock) {
       if (this.closed) {
         throw new IOException("Cannot append; log is closed");
       }
@@ -1147,7 +1239,7 @@ public class HLog implements Syncable {
     logSyncerThread.addToSyncQueue(force, txid);
   }
 
-  public void hflush() {
+  private void hflush() {
     synchronized (this.updateLock) {
       if (this.closed) {
         return;
@@ -1156,15 +1248,7 @@ public class HLog implements Syncable {
       if (this.forceSync ||
           this.unflushedEntries.get() - this.syncTillHere >= this.flushlogentries) {
         try {
-          long now = System.currentTimeMillis();
-          long doneUpto = this.unflushedEntries.get();
-          this.writer.sync();
-          // A better name for syncTillHere variable would have been
-          // syncedAtLeastTillHere. Between the time unflushedEntries is
-          // snapshotted and writer.sync() is called, append can append more
-          // entries to the log.
-          this.syncTillHere = doneUpto;
-          syncTime.inc(System.currentTimeMillis() - now);
+          this.syncTillHere += this.logBuffer.sync();
 
           // if the number of replicas in HDFS has fallen below the initial
           // value, then roll logs.
@@ -1220,12 +1304,7 @@ public class HLog implements Syncable {
   boolean canGetCurReplicas() {
     return this.getNumCurrentReplicas != null;
   }
-
-  public void hsync() {
-    // Not yet implemented up in hdfs so just call hflush.
-    hflush();
-  }
-
+  
   private void requestLogRoll() {
     if (this.listener != null) {
       this.listener.logRollRequested();
@@ -1234,8 +1313,30 @@ public class HLog implements Syncable {
 
   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
   throws IOException {
+    this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
+  }
+  
+  /**
+   * Append a log entry into the writer
+   * @param entry
+   * @throws IOException
+   */
+  private void append(Entry entry) throws IOException {
     try {
-      this.writer.append(new HLog.Entry(logKey, logEdit));
+      long now = System.currentTimeMillis();
+      this.writer.append(entry);
+      long took = System.currentTimeMillis() - now;
+      long len = 0;
+      for(KeyValue kv : entry.edit.getKeyValues()) {
+        len += kv.getLength();
+      }
+      writeSize.inc(len);
+      if (took > 1000) {
+        LOG.warn(String.format(
+          "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
+          Thread.currentThread().getName(), took, this.numEntries.get(),
+          StringUtils.humanReadableInt(len)));
+      }
     } catch (IOException e) {
       LOG.fatal("Could not append. Requesting close of hlog", e);
       requestLogRoll();