You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2009/10/28 04:58:03 UTC

svn commit: r830434 - in /hadoop/hbase/trunk: CHANGES.txt src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Author: jdcryans
Date: Wed Oct 28 03:58:03 2009
New Revision: 830434

URL: http://svn.apache.org/viewvc?rev=830434&view=rev
Log:
HBASE-1936  HLog group commit

Removed:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct 28 03:58:03 2009
@@ -142,6 +142,7 @@
    HBASE-1918  Don't do DNS resolving in .META. scanner for each row
    HBASE-1756  Refactor HLog (changing package first)
    HBASE-1926  Remove unused xmlenc jar from trunk
+   HBASE-1936  HLog group commit
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Oct 28 03:58:03 2009
@@ -91,8 +91,6 @@
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.LogFlusher;
-import org.apache.hadoop.hbase.regionserver.LogRoller;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
@@ -207,7 +205,6 @@
   // eclipse warning when accessed by inner classes
   protected volatile HLog hlog;
   LogRoller hlogRoller;
-  LogFlusher hlogFlusher;
   
   // limit compactions while starting up
   CompactionLimitThread compactionLimitThread;
@@ -329,10 +326,6 @@
     // Log rolling thread
     this.hlogRoller = new LogRoller(this);
     
-    // Log flushing thread
-    this.hlogFlusher =
-      new LogFlusher(this.threadWakeFrequency, this.stopRequested);
-    
     // Background thread to check for major compactions; needed if region
     // has not gotten updates in a while.  Make it run at a lesser frequency.
     int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
@@ -518,7 +511,6 @@
                   try {
                     serverInfo.setStartCode(System.currentTimeMillis());
                     hlog = setupHLog();
-                    this.hlogFlusher.setHLog(hlog);
                   } catch (IOException e) {
                     this.abortRequested = true;
                     this.stopRequested.set(true);
@@ -616,7 +608,6 @@
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
     cacheFlusher.interruptIfNecessary();
-    hlogFlusher.interrupt();
     compactSplitThread.interruptIfNecessary();
     hlogRoller.interruptIfNecessary();
     this.majorCompactionChecker.interrupt();
@@ -746,7 +737,6 @@
 
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
       this.hlog = setupHLog();
-      this.hlogFlusher.setHLog(hlog);
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       startServiceThreads();
@@ -1131,8 +1121,6 @@
     };
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
         handler);
-    Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
-        handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
     Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=830434&r1=830433&r2=830434&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Oct 28 03:58:03 2009
@@ -24,7 +24,6 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -34,14 +33,16 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,15 +60,12 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.SequenceFile.Reader;
@@ -126,11 +124,10 @@
   private final long blocksize;
   private final int flushlogentries;
   private final AtomicInteger unflushedEntries = new AtomicInteger(0);
-  private volatile long lastLogFlushTime;
-  private final boolean append;
-  private final Method syncfs;
   private final short replicationLevel;
-  private final static Object [] NO_ARGS = new Object []{};
+
+  // used to indirectly tell syncFs to force the sync
+  private final AtomicBoolean forceSync = new AtomicBoolean(false);
 
   /*
    * Current log file.
@@ -183,6 +180,11 @@
    */
   private final int maxLogs;
 
+  /**
+   * Thread that handles group commit
+   */
+  private final LogSyncer logSyncerThread;
+
   static byte [] COMPLETE_CACHE_FLUSH;
   static {
     try {
@@ -224,7 +226,6 @@
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
-    this.lastLogFlushTime = System.currentTimeMillis();
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -237,21 +238,9 @@
       ", flushlogentries=" + this.flushlogentries +
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
     rollWriter();
-    // Test if syncfs is available.
-    this.append = isAppend(conf);
-    Method m = null;
-    if (this.append) {
-      try {
-        m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
-        LOG.debug("Using syncFs--hadoop-4379");
-      } catch (SecurityException e) {
-        throw new IOException("Failed test for syncfs", e);
-      } catch (NoSuchMethodException e) {
-        // This can happen
-        LOG.info("syncFs--hadoop-4379 not available" );
-      }
-    }
-    this.syncfs = m;
+    logSyncerThread = new LogSyncer(this.flushlogentries);
+    Threads.setDaemonThreadRunning(logSyncerThread,
+        Thread.currentThread().getName() + ".logSyncer");
   }
 
   /**
@@ -591,6 +580,14 @@
    * @throws IOException
    */
   public void close() throws IOException {
+    try {
+      logSyncerThread.interrupt();
+      // Make sure we synced everything
+      logSyncerThread.join();
+    } catch (InterruptedException e) {
+      LOG.error("Exception while waiting for syncer thread to die", e);
+    }
+
     cacheFlushLock.lock();
     try {
       synchronized (updateLock) {
@@ -718,39 +715,118 @@
     }
   }
 
+  /**
+   * This thread is responsible to call syncFs and buffer up the writers while
+   * it happens.
+   */
+   class LogSyncer extends Thread {
+
+    // Using fairness to make sure locks are given in order
+    private final ReentrantLock lock = new ReentrantLock(true);
+
+    // Condition used to wait until we have something to sync
+    private final Condition queueEmpty = lock.newCondition();
+
+    // Condition used to signal that the sync is done
+    private final Condition syncDone = lock.newCondition();
+
+    private final int optionalFlushInterval;
+
+    LogSyncer(int optionalFlushInterval) {
+      this.optionalFlushInterval = optionalFlushInterval;
+    }
+
+    public void run() {
+      try {
+        lock.lock();
+        while(!closed) {
+
+          // Wait until something has to be synced or do it if we waited enough
+          // time (useful if something appends but does not sync).
+          queueEmpty.await(this.optionalFlushInterval, TimeUnit.MILLISECONDS);
+
+          // We got the signal, let's syncFS. We currently own the lock so new
+          // writes are waiting to acquire it in addToSyncQueue while the ones
+          // we sync are waiting on await()
+          hflush();
+
+          // Release all the clients waiting on the sync. Notice that we still
+          // own the lock until we get back to await at which point all the
+          // other threads waiting will first acquire and release locks
+          syncDone.signalAll();
+        }
+      } catch (IOException e) {
+        LOG.error("Error while syncing, requesting close of hlog ", e);
+        requestLogRoll();
+      } catch (InterruptedException e) {
+        LOG.debug(getName() + "interrupted while waiting for sync requests",e );
+      } finally {
+        lock.unlock();
+        LOG.info(getName() + " exiting");
+      }
+    }
+
+    /**
+     * This method first signals the thread that there's a sync needed
+     * and then waits for it to happen before returning.
+     */
+    public void addToSyncQueue() {
+
+      // Don't bother if somehow our append was already synced
+      if (unflushedEntries.get() == 0) {
+        return;
+      }
+      lock.lock();
+      try {
+        // Wake the thread
+        queueEmpty.signal();
+
+        // Wait for it to syncFs
+        syncDone.await();
+      } catch (InterruptedException e) {
+        LOG.debug(getName() + " was interrupted while waiting for sync", e);
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
   public void sync() throws IOException {
     sync(false);
   }
 
   /**
+   * This method calls the LogSyncer in order to group commit the sync
+   * with other threads.
+   * @param force For catalog regions, force the sync to happen
+   * @throws IOException
+   */
+  public void sync(boolean force) throws IOException {
+    // Set force sync if force is true and forceSync is false
+    forceSync.compareAndSet(!forceSync.get() && force, true);
+    logSyncerThread.addToSyncQueue();
+  }
+
+  /**
    * Multiple threads will call sync() at the same time, only the winner
    * will actually flush if there is any race or build up.
    *
-   * @param force sync regardless (for meta updates) if there is data
    * @throws IOException
    */
-  public void sync(boolean force) throws IOException {
+  protected void hflush() throws IOException {
     synchronized (this.updateLock) {
       if (this.closed)
         return;
 
-      if (this.unflushedEntries.get() == 0)
-        return; // win
-
-      if (force || this.unflushedEntries.get() > this.flushlogentries) {
+      if (this.forceSync.get() ||
+          this.unflushedEntries.get() > this.flushlogentries) {
         try {
-          lastLogFlushTime = System.currentTimeMillis();
-          if (this.append && syncfs != null) {
-            try {
-              this.syncfs.invoke(this.writer, NO_ARGS);
-            } catch (Exception e) {
-              throw new IOException("Reflection", e);
-            }
-          } else {
-            this.writer.sync();
-            if (this.writer_out != null)
-              this.writer_out.sync();
+          this.writer.sync();
+          if (this.writer_out != null) {
+            this.writer_out.sync();
           }
+          this.forceSync.compareAndSet(true, false);
           this.unflushedEntries.set(0);
         } catch (IOException e) {
           LOG.fatal("Could not append. Requesting close of hlog", e);
@@ -986,7 +1062,6 @@
     int concurrentLogReads =
       conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
     // Is append supported?
-    boolean append = isAppend(conf);
     try {
       int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / 
           concurrentLogReads)).intValue();
@@ -1005,7 +1080,6 @@
             LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
               ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
           }
-          recoverLog(fs, logfiles[i].getPath(), append);
           SequenceFile.Reader in = null;
           int count = 0;
           try {
@@ -1160,24 +1234,6 @@
   }
 
   /**
-   * @param conf
-   * @return True if append enabled and we have the syncFs in our path.
-   */
-  private static boolean isAppend(final HBaseConfiguration conf) {
-      boolean append = conf.getBoolean("dfs.support.append", false);
-      if (append) {
-        try {
-          SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
-          append = true;
-        } catch (SecurityException e) {
-        } catch (NoSuchMethodException e) {
-          append = false;
-        }
-      }
-      return append;
-    }
-
-  /**
    * Utility class that lets us keep track of the edit with it's key
    * Only used when splitting logs
    */
@@ -1224,40 +1280,6 @@
     return getHLogDirectoryName(HServerInfo.getServerName(info));
   }
 
-  /*
-   * Recover log.
-   * If append has been set, try and open log in append mode.
-   * Doing this, we get a hold of the file that crashed writer
-   * was writing to.  Once we have it, close it.  This will
-   * allow subsequent reader to see up to last sync.
-   * @param fs
-   * @param p
-   * @param append
-   */
-  private static void recoverLog(final FileSystem fs, final Path p,
-      final boolean append) {
-    if (!append) {
-      return;
-    }
-    // Trying recovery
-    boolean recovered = false;
-    while (!recovered) {
-      try {
-        FSDataOutputStream out = fs.append(p);
-        out.close();
-        recovered = true;
-      } catch (IOException e) {
-        LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ex) {
-          // ignore it and try again
-        }
-      }
-    }
-    LOG.info("Past out lease recovery");
-  }
-  
   /**
    * Construct the HLog directory name
    *