You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by to...@apache.org on 2011/02/01 07:20:22 UTC
svn commit: r1065918 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Author: todd
Date: Tue Feb 1 06:20:21 2011
New Revision: 1065918
URL: http://svn.apache.org/viewvc?rev=1065918&view=rev
Log:
HBASE-3483 Memstore lower limit should trigger asynchronous flushes
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1065918&r1=1065917&r2=1065918&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Feb 1 06:20:21 2011
@@ -69,6 +69,11 @@ Release 0.91.0 - Unreleased
HBASE-3256 Coprocessors: Coprocessor host and observer for HMaster
HBASE-3448 RegionSplitter, utility class to manually split tables
+Release 0.90.1 - Unreleased
+
+ BUG FIXES
+ HBASE-3483 Memstore lower limit should trigger asynchronous flushes
+
Release 0.90.0 - January 19th, 2011
INCOMPATIBLE CHANGES
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1065918&r1=1065917&r2=1065918&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Feb 1 06:20:21 2011
@@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.RemoteExc
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.base.Preconditions;
+
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
import java.util.ConcurrentModificationException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
@@ -39,6 +41,8 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -56,12 +60,14 @@ class MemStoreFlusher extends Thread imp
// a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
new DelayQueue<FlushQueueEntry>();
- private final Map<HRegion, FlushQueueEntry> regionsInQueue =
- new HashMap<HRegion, FlushQueueEntry>();
+ private final Map<HRegion, FlushRegionEntry> regionsInQueue =
+ new HashMap<HRegion, FlushRegionEntry>();
+ private AtomicBoolean wakeupPending = new AtomicBoolean();
private final long threadWakeFrequency;
private final HRegionServer server;
private final ReentrantLock lock = new ReentrantLock();
+ private final Condition flushOccurred = lock.newCondition();
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
@@ -133,17 +139,66 @@ class MemStoreFlusher extends Thread imp
}
return (long)(max * limit);
}
+
+ /**
+ * The memstore across all regions has exceeded the low water mark. Pick
+ * one region to flush and flush it synchronously (this is called from the
+ * flush thread)
+ * @return true if successful
+ */
+ private boolean flushOneForGlobalPressure() {
+ SortedMap<Long, HRegion> regionsBySize =
+ server.getCopyOfOnlineRegionsSortedBySize();
+ // Find the biggest region that doesn't have too many storefiles
+ HRegion bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, true);
+ // Find the biggest region, total, even if it might have too many flushes.
+ HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, false);
+
+ Preconditions.checkState(bestAnyRegion != null,
+ "Above memory mark but there are no regions!");
+
+ HRegion regionToFlush;
+ if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
+ // Even if it's not supposed to be flushed, pick a region if it's more than twice
+ // as big as the best flushable one - otherwise when we're under pressure we make
+ // lots of little flushes and cause lots of compactions, etc, which just makes
+ // life worse!
+ LOG.info("Under global heap pressure: " +
+ "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
+ "store files, but is " +
+ StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
+ " vs best flushable region's " +
+ StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
+ ". Choosing the bigger.");
+ regionToFlush = bestAnyRegion;
+ } else {
+ regionToFlush = bestFlushableRegion;
+ }
+
+ Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
+
+ LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
+ return flushRegion(regionToFlush, true);
+ }
@Override
public void run() {
while (!this.server.isStopped()) {
FlushQueueEntry fqe = null;
try {
+ wakeupPending.set(false); // allow someone to wake us up again
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- if (fqe == null) {
+ if (fqe == null || fqe instanceof WakeupFlushThread) {
+ if (isAboveLowWaterMark()) {
+ LOG.info("Flush thread woke up with memory above low water.");
+ flushOneForGlobalPressure();
+ // Enqueue another one of these tokens so we'll wake up again
+ wakeupFlushThread();
+ }
continue;
}
- if (!flushRegion(fqe)) {
+ FlushRegionEntry fre = (FlushRegionEntry)fqe;
+ if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
@@ -151,9 +206,7 @@ class MemStoreFlusher extends Thread imp
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
- LOG.error("Cache flush failed" +
- (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
- ex);
+ LOG.error("Cache flusher failed for entry " + fqe);
if (!server.checkFileSystem()) {
break;
}
@@ -164,12 +217,46 @@ class MemStoreFlusher extends Thread imp
LOG.info(getName() + " exiting");
}
+ private void wakeupFlushThread() {
+ if (wakeupPending.compareAndSet(false, true)) {
+ flushQueue.add(new WakeupFlushThread());
+ }
+ }
+
+ private HRegion getBiggestMemstoreRegion(
+ SortedMap<Long, HRegion> regionsBySize,
+ boolean checkStoreFileCount) {
+ synchronized (regionsInQueue) {
+ for (HRegion region : regionsBySize.values()) {
+ if (checkStoreFileCount && isTooManyStoreFiles(region)) {
+ continue;
+ }
+ return region;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return true if global memory usage is above the high watermark
+ */
+ private boolean isAboveHighWaterMark() {
+ return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
+ }
+
+ /**
+ * Return true if we're above the high watermark
+ */
+ private boolean isAboveLowWaterMark() {
+ return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
+ }
+
public void requestFlush(HRegion r) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
- FlushQueueEntry fqe = new FlushQueueEntry(r);
+ FlushRegionEntry fqe = new FlushRegionEntry(r);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
@@ -196,7 +283,7 @@ class MemStoreFlusher extends Thread imp
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
- private boolean flushRegion(final FlushQueueEntry fqe) {
+ private boolean flushRegion(final FlushRegionEntry fqe) {
HRegion region = fqe.region;
if (!fqe.region.getRegionInfo().isMetaRegion() &&
isTooManyStoreFiles(region)) {
@@ -237,7 +324,7 @@ class MemStoreFlusher extends Thread imp
*/
private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
synchronized (this.regionsInQueue) {
- FlushQueueEntry fqe = this.regionsInQueue.remove(region);
+ FlushRegionEntry fqe = this.regionsInQueue.remove(region);
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
@@ -266,6 +353,7 @@ class MemStoreFlusher extends Thread imp
return false;
}
} finally {
+ flushOccurred.signalAll();
lock.unlock();
}
return true;
@@ -287,49 +375,38 @@ class MemStoreFlusher extends Thread imp
* amount of memstore consumption.
*/
public synchronized void reclaimMemStoreMemory() {
- if (server.getGlobalMemStoreSize() >= globalMemStoreLimit) {
- flushSomeRegions();
+ if (isAboveHighWaterMark()) {
+ lock.lock();
+ try {
+ while (isAboveHighWaterMark()) {
+ wakeupFlushThread();
+ flushOccurred.awaitUninterruptibly();
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else if (isAboveLowWaterMark()) {
+ wakeupFlushThread();
}
}
- /*
- * Emergency! Need to flush memory.
+ interface FlushQueueEntry extends Delayed {}
+
+ /**
+ * Token to insert into the flush queue that ensures that the flusher does not sleep
*/
- private synchronized void flushSomeRegions() {
- // keep flushing until we hit the low water mark
- long globalMemStoreSize = -1;
- ArrayList<HRegion> regionsToCompact = new ArrayList<HRegion>();
- for (SortedMap<Long, HRegion> m =
- this.server.getCopyOfOnlineRegionsSortedBySize();
- (globalMemStoreSize = server.getGlobalMemStoreSize()) >=
- this.globalMemStoreLimitLowMark;) {
- // flush the region with the biggest memstore
- if (m.size() <= 0) {
- LOG.info("No online regions to flush though we've been asked flush " +
- "some; globalMemStoreSize=" +
- StringUtils.humanReadableInt(globalMemStoreSize) +
- ", globalMemStoreLimitLowMark=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
- break;
- }
- HRegion biggestMemStoreRegion = m.remove(m.firstKey());
- LOG.info("Forced flushing of " + biggestMemStoreRegion.toString() +
- " because global memstore limit of " +
- StringUtils.humanReadableInt(this.globalMemStoreLimit) +
- " exceeded; currently " +
- StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
- StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
- if (!flushRegion(biggestMemStoreRegion, true)) {
- LOG.warn("Flush failed");
- break;
- }
- regionsToCompact.add(biggestMemStoreRegion);
+ static class WakeupFlushThread implements FlushQueueEntry {
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return 0;
}
- for (HRegion region : regionsToCompact) {
- server.compactSplitThread.requestCompaction(region, getName());
+
+ @Override
+ public int compareTo(Delayed o) {
+ return -1;
}
}
-
+
/**
* Datastructure used in the flush queue. Holds region and retry count.
* Keeps tabs on how old this object is. Implements {@link Delayed}. On
@@ -338,13 +415,14 @@ class MemStoreFlusher extends Thread imp
* milliseconds before readding to delay queue if you want it to stay there
* a while.
*/
- static class FlushQueueEntry implements Delayed {
+ static class FlushRegionEntry implements FlushQueueEntry {
private final HRegion region;
+
private final long createTime;
private long whenToExpire;
private int requeueCount = 0;
- FlushQueueEntry(final HRegion r) {
+ FlushRegionEntry(final HRegion r) {
this.region = r;
this.createTime = System.currentTimeMillis();
this.whenToExpire = this.createTime;
@@ -372,7 +450,7 @@ class MemStoreFlusher extends Thread imp
* to whatever you pass.
* @return This.
*/
- public FlushQueueEntry requeue(final long when) {
+ public FlushRegionEntry requeue(final long when) {
this.whenToExpire = System.currentTimeMillis() + when;
this.requeueCount++;
return this;
@@ -389,5 +467,10 @@ class MemStoreFlusher extends Thread imp
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
other.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
+
+ @Override
+ public String toString() {
+ return "[flush region " + Bytes.toString(region.getRegionName()) + "]";
+ }
}
}