You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2011/02/01 07:20:22 UTC

svn commit: r1065918 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java

Author: todd
Date: Tue Feb  1 06:20:21 2011
New Revision: 1065918

URL: http://svn.apache.org/viewvc?rev=1065918&view=rev
Log:
HBASE-3483 Memstore lower limit should trigger asynchronous flushes

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1065918&r1=1065917&r2=1065918&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Feb  1 06:20:21 2011
@@ -69,6 +69,11 @@ Release 0.91.0 - Unreleased
    HBASE-3256  Coprocessors: Coprocessor host and observer for HMaster
    HBASE-3448  RegionSplitter, utility class to manually split tables
 
+Release 0.90.1 - Unreleased
+
+  BUG FIXES
+   HBASE-3483  Memstore lower limit should trigger asynchronous flushes
+
 
 Release 0.90.0 - January 19th, 2011
   INCOMPATIBLE CHANGES

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1065918&r1=1065917&r2=1065918&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Feb  1 06:20:21 2011
@@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
@@ -39,6 +41,8 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -56,12 +60,14 @@ class MemStoreFlusher extends Thread imp
   // a corresponding entry in the other.
   private final BlockingQueue<FlushQueueEntry> flushQueue =
     new DelayQueue<FlushQueueEntry>();
-  private final Map<HRegion, FlushQueueEntry> regionsInQueue =
-    new HashMap<HRegion, FlushQueueEntry>();
+  private final Map<HRegion, FlushRegionEntry> regionsInQueue =
+    new HashMap<HRegion, FlushRegionEntry>();
+  private AtomicBoolean wakeupPending = new AtomicBoolean();
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
   private final ReentrantLock lock = new ReentrantLock();
+  private final Condition flushOccurred = lock.newCondition();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -133,17 +139,66 @@ class MemStoreFlusher extends Thread imp
     }
     return (long)(max * limit);
   }
+  
+  /**
+   * The memstore across all regions has exceeded the low water mark. Pick
+   * one region to flush and flush it synchronously (this is called from the
+   * flush thread)
+   * @return true if successful
+   */
+  private boolean flushOneForGlobalPressure() {
+    SortedMap<Long, HRegion> regionsBySize =
+        server.getCopyOfOnlineRegionsSortedBySize();
+    // Find the biggest region that doesn't have too many storefiles
+    HRegion bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, true);
+    // Find the biggest region, total, even if it might have too many flushes.
+    HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, false);
+
+    Preconditions.checkState(bestAnyRegion != null,
+        "Above memory mark but there are no regions!");
+    
+    HRegion regionToFlush;
+    if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
+      // Even if it's not supposed to be flushed, pick a region if it's more than twice
+      // as big as the best flushable one - otherwise when we're under pressure we make
+      // lots of little flushes and cause lots of compactions, etc, which just makes
+      // life worse!
+      LOG.info("Under global heap pressure: " +
+          "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
+          "store files, but is " +
+          StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
+          " vs best flushable region's " +
+          StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
+          ". Choosing the bigger.");
+      regionToFlush = bestAnyRegion;
+    } else {
+      regionToFlush = bestFlushableRegion;
+    }
+    
+    Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
+    
+    LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
+    return flushRegion(regionToFlush, true);
+  }
 
   @Override
   public void run() {
     while (!this.server.isStopped()) {
       FlushQueueEntry fqe = null;
       try {
+        wakeupPending.set(false); // allow someone to wake us up again
         fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (fqe == null) {
+        if (fqe == null || fqe instanceof WakeupFlushThread) {
+          if (isAboveLowWaterMark()) {
+            LOG.info("Flush thread woke up with memory above low water.");
+            flushOneForGlobalPressure();
+            // Enqueue another one of these tokens so we'll wake up again
+            wakeupFlushThread();            
+          }
           continue;
         }
-        if (!flushRegion(fqe)) {
+        FlushRegionEntry fre = (FlushRegionEntry)fqe;
+        if (!flushRegion(fre)) {
           break;
         }
       } catch (InterruptedException ex) {
@@ -151,9 +206,7 @@ class MemStoreFlusher extends Thread imp
       } catch (ConcurrentModificationException ex) {
         continue;
       } catch (Exception ex) {
-        LOG.error("Cache flush failed" +
-          (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
-          ex);
+        LOG.error("Cache flusher failed for entry " + fqe);
         if (!server.checkFileSystem()) {
           break;
         }
@@ -164,12 +217,46 @@ class MemStoreFlusher extends Thread imp
     LOG.info(getName() + " exiting");
   }
 
+  private void wakeupFlushThread() {
+    if (wakeupPending.compareAndSet(false, true)) {
+      flushQueue.add(new WakeupFlushThread());
+    }
+  }
+
+  private HRegion getBiggestMemstoreRegion(
+      SortedMap<Long, HRegion> regionsBySize,
+      boolean checkStoreFileCount) {
+    synchronized (regionsInQueue) {
+      for (HRegion region : regionsBySize.values()) {
+        if (checkStoreFileCount && isTooManyStoreFiles(region)) {
+          continue;
+        }
+        return region;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Return true if global memory usage is above the high watermark
+   */
+  private boolean isAboveHighWaterMark() {
+    return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
+  }
+  
+  /**
+   * Return true if we're above the high watermark
+   */
+  private boolean isAboveLowWaterMark() {
+    return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
+  }
+
   public void requestFlush(HRegion r) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue.  It'll come out near immediately.
-        FlushQueueEntry fqe = new FlushQueueEntry(r);
+        FlushRegionEntry fqe = new FlushRegionEntry(r);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
       }
@@ -196,7 +283,7 @@ class MemStoreFlusher extends Thread imp
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final FlushQueueEntry fqe) {
+  private boolean flushRegion(final FlushRegionEntry fqe) {
     HRegion region = fqe.region;
     if (!fqe.region.getRegionInfo().isMetaRegion() &&
         isTooManyStoreFiles(region)) {
@@ -237,7 +324,7 @@ class MemStoreFlusher extends Thread imp
    */
   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
     synchronized (this.regionsInQueue) {
-      FlushQueueEntry fqe = this.regionsInQueue.remove(region);
+      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
       if (fqe != null && emergencyFlush) {
         // Need to remove from region from delay queue.  When NOT an
         // emergencyFlush, then item was removed via a flushQueue.poll.
@@ -266,6 +353,7 @@ class MemStoreFlusher extends Thread imp
         return false;
       }
     } finally {
+      flushOccurred.signalAll();
       lock.unlock();
     }
     return true;
@@ -287,49 +375,38 @@ class MemStoreFlusher extends Thread imp
    * amount of memstore consumption.
    */
   public synchronized void reclaimMemStoreMemory() {
-    if (server.getGlobalMemStoreSize() >= globalMemStoreLimit) {
-      flushSomeRegions();
+    if (isAboveHighWaterMark()) {
+      lock.lock();
+      try {
+        while (isAboveHighWaterMark()) {
+          wakeupFlushThread();
+          flushOccurred.awaitUninterruptibly();
+        }
+      } finally {
+        lock.unlock();
+      }
+    } else if (isAboveLowWaterMark()) {
+      wakeupFlushThread();
     }
   }
 
-  /*
-   * Emergency!  Need to flush memory.
+  interface FlushQueueEntry extends Delayed {}
+  
+  /**
+   * Token to insert into the flush queue that ensures that the flusher does not sleep
    */
-  private synchronized void flushSomeRegions() {
-    // keep flushing until we hit the low water mark
-    long globalMemStoreSize = -1;
-    ArrayList<HRegion> regionsToCompact = new ArrayList<HRegion>();
-    for (SortedMap<Long, HRegion> m =
-        this.server.getCopyOfOnlineRegionsSortedBySize();
-      (globalMemStoreSize = server.getGlobalMemStoreSize()) >=
-        this.globalMemStoreLimitLowMark;) {
-      // flush the region with the biggest memstore
-      if (m.size() <= 0) {
-        LOG.info("No online regions to flush though we've been asked flush " +
-          "some; globalMemStoreSize=" +
-          StringUtils.humanReadableInt(globalMemStoreSize) +
-          ", globalMemStoreLimitLowMark=" +
-          StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-        break;
-      }
-      HRegion biggestMemStoreRegion = m.remove(m.firstKey());
-      LOG.info("Forced flushing of " +  biggestMemStoreRegion.toString() +
-        " because global memstore limit of " +
-        StringUtils.humanReadableInt(this.globalMemStoreLimit) +
-        " exceeded; currently " +
-        StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
-        StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-      if (!flushRegion(biggestMemStoreRegion, true)) {
-        LOG.warn("Flush failed");
-        break;
-      }
-      regionsToCompact.add(biggestMemStoreRegion);
+  static class WakeupFlushThread implements FlushQueueEntry {
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return 0;
     }
-    for (HRegion region : regionsToCompact) {
-      server.compactSplitThread.requestCompaction(region, getName());
+
+    @Override
+    public int compareTo(Delayed o) {
+      return -1;
     }
   }
-
+  
   /**
    * Datastructure used in the flush queue.  Holds region and retry count.
    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
@@ -338,13 +415,14 @@ class MemStoreFlusher extends Thread imp
    * milliseconds before readding to delay queue if you want it to stay there
    * a while.
    */
-  static class FlushQueueEntry implements Delayed {
+  static class FlushRegionEntry implements FlushQueueEntry {
     private final HRegion region;
+    
     private final long createTime;
     private long whenToExpire;
     private int requeueCount = 0;
 
-    FlushQueueEntry(final HRegion r) {
+    FlushRegionEntry(final HRegion r) {
       this.region = r;
       this.createTime = System.currentTimeMillis();
       this.whenToExpire = this.createTime;
@@ -372,7 +450,7 @@ class MemStoreFlusher extends Thread imp
      * to whatever you pass.
      * @return This.
      */
-    public FlushQueueEntry requeue(final long when) {
+    public FlushRegionEntry requeue(final long when) {
       this.whenToExpire = System.currentTimeMillis() + when;
       this.requeueCount++;
       return this;
@@ -389,5 +467,10 @@ class MemStoreFlusher extends Thread imp
       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
     }
+    
+    @Override
+    public String toString() {
+      return "[flush region " + Bytes.toString(region.getRegionName()) + "]";
+    }
   }
 }