You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/11 20:18:27 UTC

svn commit: r1586735 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Fri Apr 11 18:18:26 2014
New Revision: 1586735

URL: http://svn.apache.org/r1586735
Log:
[master] Fix bugs in MemStoreFlusher

Author: daviddeng

Summary:
Use `ScheduledThreadPoolExecutor`
Change some logic
Add `IHRegion` and `IHRegionServer` interfaces.
Add testcase

Test Plan:
`TestMemStoreFlusher`
`TestRegionServerOnlineConfigChange`
`TestPerColumnFamilyFlush.testFlushingWhenLogRolling`

Reviewers: liyintang, gauravm, aaiyer, khadkevich, manukranthk

Reviewed By: manukranthk

CC: hbase-eng@, fan, elliott

Differential Revision: https://phabricator.fb.com/D1264374

Task ID: 4114747

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Apr 11 18:18:26 2014
@@ -1092,6 +1092,18 @@ public final class HConstants {
 
   public static final String SWIFT_CLIENT_SOCKS_PROXY_HOST_AND_PORT =
       "hbase.client.swift.socks.proxy.hostAndPort";
+  /**
+   * Key of the number of store-files for blocking flush
+   */
+  public static final String HSTORE_BLOCKING_STORE_FILES_KEY =
+  "hbase.hstore.blockingStoreFiles";
+
+  /**
+   * Key to the maximum wait time in milliseconds for blocking flush
+   */
+  public static final String HSTORE_BLOCKING_WAIT_TIME_KEY =
+      "hbase.hstore.blockingWaitTime";
+  public static final long DEFAULT_HSTORE_BLOCKING_WAIT_TIME = 90000;
   private HConstants() {
     // Can't be instantiated with this constructor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java Fri Apr 11 18:18:26 2014
@@ -37,5 +37,5 @@ public interface FlushRequester {
    *                              would be flushed.
    *
    */
-  void request(HRegion region, boolean selectiveFlushRequest);
+  void request(HRegionIf region, boolean selectiveFlushRequest);
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Apr 11 18:18:26 2014
@@ -146,7 +146,7 @@ import com.google.common.collect.Lists;
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HeapSize, ConfigurationObserver {
+public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
@@ -718,10 +718,8 @@ public class HRegion implements HeapSize
     }
   }
 
-  /**
-   * @return True if this region has references.
-   */
-  boolean hasReferences() {
+  @Override
+  public boolean hasReferences() {
     for (Store store : this.stores.values()) {
       for (StoreFile sf : store.getStorefiles()) {
         // Found a reference, return.
@@ -789,6 +787,7 @@ public class HRegion implements HeapSize
   }
 
   /** @return a HRegionInfo object for this region */
+  @Override
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
   }
@@ -1052,7 +1051,7 @@ public class HRegion implements HeapSize
     return this.lastStoreFlushTimeMap.get(store);
   }
 
-  /** @return how info about the last flushes <time, size> */
+  @Override
   public List<Pair<Long,Long>> getRecentFlushInfo() {
     // only MemStoreFlusher thread should be calling this, so read lock is okay
     this.splitsAndClosesLock.readLock().lock();
@@ -1344,32 +1343,11 @@ public class HRegion implements HeapSize
    * @throws IOException
    */
   public boolean flushcache() throws IOException {
-    return flushcache(false);
+    return flushMemstoreShapshot(false);
   }
 
-  /**
-   * Flush the cache.
-   *
-   * When this method is called the cache will be flushed unless:
-   * <ol>
-   *   <li>the cache is empty</li>
-   *   <li>the region is closed.</li>
-   *   <li>a flush is already in progress</li>
-   *   <li>writes are disabled</li>
-   * </ol>
-   *
-   * <p>This method may block for some time, so it should not be called from a
-   * time-sensitive thread.
-   *
-   * @param selectiveFlushRequest If true, selectively flush column families
-   *                              which dominate the memstore size, provided it
-   *                              is enabled in the configuration.
-   *
-   * @return true if cache was flushed
-   *
-   * @throws IOException general io exceptions
-   */
-  public boolean flushcache(boolean selectiveFlushRequest) throws IOException {
+  @Override
+  public boolean flushMemstoreShapshot(boolean selectiveFlushRequest) throws IOException {
     // If a selective flush was requested, but the per-column family switch is
     // off, we cannot do a selective flush.
     if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) {
@@ -2337,6 +2315,7 @@ public class HRegion implements HeapSize
    * @throws IOException
    * @return true if the new put was execute, false otherwise
    */
+  @SuppressWarnings("deprecation")
   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
       byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
   throws IOException{
@@ -2914,6 +2893,9 @@ public class HRegion implements HeapSize
     return this.stores.get(column);
   }
 
+  /**
+   * @return a map from column family to Store.
+   */
   public Map<byte[], Store> getStores() {
     return this.stores;
   }
@@ -3675,6 +3657,7 @@ public class HRegion implements HeapSize
    * @return result
    * @throws IOException read exceptions
    */
+  @SuppressWarnings("deprecation")
   public Result get(final Get get, final Integer lockid) throws IOException {
     // Verify families are all valid
     if (get.hasFamilies()) {
@@ -4157,4 +4140,13 @@ public class HRegion implements HeapSize
       s.updateConfiguration();
     }
   }
+
+  @Override
+  public int maxStoreFilesCount() {
+    int res = 0;
+    for (Store hstore : this.stores.values()) {
+      res = Math.max(res, hstore.getStorefilesCount());
+    }
+    return res;
+  }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionIf.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,77 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The interface of a storage instance of a whole region.
+ *
+ * The only online server is using HRegion as the implementation.
+ */
+public interface HRegionIf {
+  /**
+   * @return the HRegionInfo of this region
+   */
+  public HRegionInfo getRegionInfo();
+
+  /**
+   * Flushes the cache.
+   *
+   * When this method is called the cache will be flushed unless:
+   * <ol>
+   * <li>the cache is empty</li>
+   * <li>the region is closed.</li>
+   * <li>a flush is already in progress</li>
+   * <li>writes are disabled</li>
+   * </ol>
+   *
+   * <p>
+   * This method may block for some time, so it should not be called from a
+   * time-sensitive thread.
+   *
+   * @param selectiveFlushRequest If true, selectively flush column families
+   *          which dominate the memstore size, provided it
+   *          is enabled in the configuration.
+   *
+   * @return true if cache was flushed
+   */
+  public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+      throws IOException;
+
+  /**
+   * @return how info about the last flushes <time, size>
+   */
+  public List<Pair<Long, Long>> getRecentFlushInfo();
+
+  /**
+   * @return True if this region has references.
+   */
+  public boolean hasReferences();
+
+  /**
+   * @return the maximum number of files among all stores.
+   */
+  public int maxStoreFilesCount();
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Apr 11 18:18:26 2014
@@ -98,7 +98,6 @@ import org.apache.hadoop.hbase.UnknownSc
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.MultiAction;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
@@ -120,7 +119,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
-import org.apache.hadoop.hbase.io.hfile.histogram.HistogramUtils;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
@@ -176,7 +174,7 @@ import com.google.common.base.Preconditi
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
-public class HRegionServer implements HRegionInterface,
+public class HRegionServer implements HRegionInterface, HRegionServerIf,
     HBaseRPCErrorHandler, Runnable, Watcher, ConfigurationObserver {
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
@@ -1322,6 +1320,7 @@ public class HRegionServer implements HR
     }
   }
 
+  @Override
   public AtomicLong getGlobalMemstoreSize() {
     return globalMemstoreSize;
   }
@@ -1459,12 +1458,7 @@ public class HRegionServer implements HR
     return stop;
   }
 
-  /**
-   * Checks to see if the file system is still accessible.
-   * If not, sets abortRequested and stopRequested
-   *
-   * @return false if file system is not available
-   */
+  @Override
   public void checkFileSystem() {
     long curtime = EnvironmentEdgeManager.currentTimeMillis();
     synchronized (lastCheckFSAt){
@@ -1808,9 +1802,7 @@ public class HRegionServer implements HR
     }
   }
 
-  /**
-   * @return Region server metrics instance.
-   */
+  @Override
   public RegionServerMetrics getMetrics() {
     return this.metrics;
   }
@@ -3468,10 +3460,7 @@ public class HRegionServer implements HR
         Bytes.mapKey(hr.getRegionInfo().getRegionName()), hr);
   }
 
-  /**
-   * @return A new Map of online regions sorted by region size with the first
-   * entry being the biggest.
-   */
+  @Override
   public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
     // we'll sort the regions in reverse
     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
@@ -4004,7 +3993,7 @@ public class HRegionServer implements HR
     return null;
   }
 
-  /** @return what the regionserver thread name should be */
+  @Override
   public String getRSThreadName() {
     return "RS-" + serverInfo.getServerName();
   }
@@ -4118,10 +4107,20 @@ public class HRegionServer implements HR
 
   @Override
   public HRegionLocation getLocation(byte[] table, byte[] row, boolean reload)
-    throws IOException {
+      throws IOException {
     if (reload) {
       return regionServerConnection.relocateRegion(new StringBytes(table), row);
     }
     return regionServerConnection.locateRegion(new StringBytes(table), row);
   }
+
+  @Override
+  public boolean requestSplit(HRegionIf r) {
+    return this.compactSplitThread.requestSplit((HRegion) r);
+  }
+
+  @Override
+  public void requestCompaction(HRegionIf r, String why) {
+    this.compactSplitThread.requestCompaction((HRegion) r, why);
+  }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServerIf.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
+
+/**
+ * The interface representing a region server.
+ * A regions server serves a set of HRegions available to clients.
+ *
+ * The only online server is using HRegionServer as the implementation.
+ */
+public interface HRegionServerIf {
+  /**
+   * @return what the regionserver thread name should be
+   */
+  public String getRSThreadName();
+
+  /**
+   * Checks to see if the file system is still accessible.
+   * If not, sets abortRequested and stopRequested
+   *
+   * @return false if file system is not available
+   */
+  public void checkFileSystem();
+
+  /**
+   * Requests the region server to make a split on a specific region-store.
+   */
+  public boolean requestSplit(HRegionIf r);
+
+  /**
+   * Requests the region server to make a compaction on a specific region-store.
+   *
+   * @param r the region-store.
+   * @param why Why compaction requested -- used in debug messages
+   */
+  public void requestCompaction(HRegionIf r, String why);
+
+  /**
+   * @return Region server metrics instance.
+   */
+  public RegionServerMetrics getMetrics();
+
+  /**
+   * @return the size of global mem-store in bytes as an AtomicLong.
+   */
+  public AtomicLong getGlobalMemstoreSize();
+
+  /**
+   * @return A new SortedMap of online regions sorted by region size with the
+   *         first entry being the biggest.
+   */
+  public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize();
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr 11 18:18:26 2014
@@ -19,30 +19,29 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.DaemonThreadFactory;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.util.StringUtils;
-
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.Future;
-import java.util.concurrent.Delayed;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Thread that flushes cache on request
@@ -55,14 +54,10 @@ import java.util.concurrent.locks.Reentr
  */
 class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
-  // These two data members go together.  Any entry in the one must have
-  // a corresponding entry in the other.
-  private final Map<HRegion, FlushQueueEntry> regionsInQueue =
-    new HashMap<HRegion, FlushQueueEntry>();
-
-  private final boolean perColumnFamilyFlushEnabled;
-  private final HRegionServer server;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Map<HRegionIf, Pair<FlushQueueEntry, Future<Boolean>>>
+      regionsInQueue = new HashMap<>();
+
+  private final HRegionServerIf server;
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -73,20 +68,18 @@ class MemStoreFlusher implements FlushRe
     "hbase.regionserver.global.memstore.upperLimit";
   private static final String LOWER_KEY =
     "hbase.regionserver.global.memstore.lowerLimit";
-  private long blockingStoreFilesNumber;
+
+  private int blockingStoreFilesNumber;
   private long blockingWaitTime;
 
   private int handlerCount;
-  private final ThreadPoolExecutor flushes;
-  Map<FlushQueueEntry, Future> futures = new HashMap<FlushQueueEntry, Future>();
+  private final ScheduledThreadPoolExecutor threadPool;
 
   /**
    * @param conf
    * @param server
    */
-  public MemStoreFlusher(final Configuration conf,
-      final HRegionServer server) {
-    super();
+  public MemStoreFlusher(final Configuration conf, HRegionServerIf server) {
     this.server = server;
     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
     this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
@@ -99,17 +92,15 @@ class MemStoreFlusher implements FlushRe
     }
     this.globalMemStoreLimitLowMark = lower;
     this.blockingStoreFilesNumber =
-      conf.getInt("hbase.hstore.blockingStoreFiles", -1);
+        conf.getInt(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, -1);
     if (this.blockingStoreFilesNumber == -1) {
       this.blockingStoreFilesNumber = 1 +
         conf.getInt("hbase.hstore.compactionThreshold", 3);
     }
-    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
-      90000);
+    this.blockingWaitTime =
+        conf.getLong(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY,
+            HConstants.DEFAULT_HSTORE_BLOCKING_WAIT_TIME);
 
-    this.perColumnFamilyFlushEnabled = conf.getBoolean(
-            HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
-            HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
     // number of "memstore flusher" threads per region server
     this.handlerCount = conf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
 
@@ -118,9 +109,9 @@ class MemStoreFlusher implements FlushRe
       ", globalMemStoreLimitLowMark=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
       ", maxHeap=" + StringUtils.humanReadableInt(max));
-    this.flushes = new ThreadPoolExecutor(handlerCount, handlerCount,
-        60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+    this.threadPool = new ScheduledThreadPoolExecutor(handlerCount,
         new DaemonThreadFactory("flush-thread-"));
+    this.threadPool.setMaximumPoolSize(handlerCount);
   }
 
   /**
@@ -147,58 +138,70 @@ class MemStoreFlusher implements FlushRe
     return (long)(max * limit);
   }
 
-  public void request(HRegion r, boolean isSelective) {
+  @Override
+  public void request(HRegionIf r, boolean isSelective) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue.  It'll come out near immediately.
         FlushQueueEntry fqe = new FlushQueueEntry(r, isSelective);
-        this.regionsInQueue.put(r, fqe);
-        executeFlushQueueEntry(fqe);
+        regionsInQueue.put(r, Pair.newPair(fqe, (Future<Boolean>) null));
+        executeFlushQueueEntry(fqe, 0);
+      } else {
+        LOG.info("Flush for " + r + " already scheduled.");
       }
     }
   }
 
-  protected void executeFlushQueueEntry(final FlushQueueEntry fqe) {
-    Runnable runnable = new Runnable() {
+  /**
+   * Called synchronized with regionsInQueue
+   */
+  protected void executeFlushQueueEntry(final FlushQueueEntry fqe, long msDelay) {
+    Callable<Boolean> callable = new Callable<Boolean>() {
       @Override
-      public void run() {
-        try {
-          String name = String.format("%s.cacheFlusher.%d", MemStoreFlusher.this.server.getRSThreadName(),
-                  MemStoreFlusher.this.flushes.getCorePoolSize() + 1);
-          if (!flushRegion(fqe, name)) {
-            LOG.warn("Failed to flush " + fqe.region);
-          }
-        } catch (Exception ex) {
-          LOG.error("Cache flush failed" +
-                   (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""),
-                   ex
-          );
-          server.checkFileSystem();
+      public Boolean call() throws Exception {
+        String name =
+            String.format("%s.cacheFlusher.%d",
+                MemStoreFlusher.this.server.getRSThreadName(),
+                Thread.currentThread().getId());
+        if (!flushRegion(fqe, name)) {
+          LOG.warn("Failed to flush " + fqe.region);
+          return false;
         }
+        return true;
       }
     };
-    futures.put(fqe, this.flushes.submit(runnable));
+
+    LOG.debug("Schedule a flush request " + fqe + " with delay " + msDelay
+        + "ms");
+
+    Future<Boolean> future =
+        this.threadPool.schedule(callable, msDelay, TimeUnit.MILLISECONDS);
+    Pair<FlushQueueEntry, Future<Boolean>> pair =
+        regionsInQueue.get(fqe.region);
+    if (pair != null) {
+      pair.setSecond(future);
+    }
   }
 
   /**
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    flushes.shutdown();
+    threadPool.shutdown();
   }
 
 
   boolean isAlive() {
-    return !flushes.isShutdown();
+    return !threadPool.isShutdown();
   }
 
   void join() {
     boolean done = false;
     while (!done) {
       try {
-        done = flushes.awaitTermination(60, TimeUnit.SECONDS);
         LOG.debug("Waiting for flush thread to finish...");
+        done = threadPool.awaitTermination(60, TimeUnit.SECONDS);
       } catch (InterruptedException ie) {
         LOG.error("Interrupted waiting for flush thread to finish...");
       }
@@ -209,25 +212,26 @@ class MemStoreFlusher implements FlushRe
    * A flushRegion that checks store file count.  If too many, puts the flush
    * on delay queue to retry later.
    * @param fqe
-   * @return true if the region was successfully flushed, false otherwise. If 
+   * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
   private boolean flushRegion(final FlushQueueEntry fqe, String why) {
-    HRegion region = fqe.region;
+    HRegionIf region = fqe.region;
     if (!fqe.region.getRegionInfo().isMetaRegion() &&
-        isTooManyStoreFiles(region)) {
+        region.maxStoreFilesCount() > this.blockingStoreFilesNumber) {
       if (fqe.isMaximumWait(this.blockingWaitTime)) {
-        LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
-          "ms on a compaction to clean up 'too many store files'; waited " +
-          "long enough... proceeding with flush of " +
-          region.getRegionNameAsString());
+        LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime)
+            + "ms on a compaction to clean up 'too many store files'; waited "
+            + "long enough... proceeding with flush of "
+            + region.getRegionInfo().getRegionNameAsString());
       } else {
         // If this is first time we've been put off, then emit a log message.
         if (fqe.getRequeueCount() <= 0) {
           // Note: We don't impose blockingStoreFiles constraint on meta regions
-          LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
-            "store files; delaying flush up to " + this.blockingWaitTime + "ms");
+          LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString()
+              + " has too many store files; delaying flush up to "
+              + this.blockingWaitTime + "ms");
         }
 
         /* If a split has been requested, we avoid scheduling a compaction
@@ -237,27 +241,32 @@ class MemStoreFlusher implements FlushRe
          * references to parent regions are removed, and we can split this
          * region further.
          */
-        if (!this.server.compactSplitThread.requestSplit(region)
-            || region.hasReferences()) {
-          this.server.compactSplitThread.requestCompaction(region, why);
+        if (!this.server.requestSplit(region) || region.hasReferences()) {
+          this.server.requestCompaction(region, why);
         }
-        // Put back on the queue.  Have it come back out of the queue
-        // after a delay of this.blockingWaitTime / 100 ms.
-        executeFlushQueueEntry(fqe.requeue(this.blockingWaitTime / 100));
-        // Tell a lie, it's not flushed but it's ok
+
+        synchronized (this.regionsInQueue) {
+          // Put back on the queue. Have it come back out of the queue
+          // after a delay of this.blockingWaitTime / 100 ms.
+          executeFlushQueueEntry(fqe.requeue(), this.blockingWaitTime / 100);
+        }
+        // Tell a lie, it's not flushed but it's OK
         return true;
       }
     }
-    return flushRegion(region, why, false, fqe.isSelectiveFlushRequest());
+    try {
+      return flushRegionNow(region, why, fqe.selective());
+    } finally {
+      // the task is executed, remove from regionsInQueue
+      synchronized (this.regionsInQueue) {
+        this.regionsInQueue.remove(region);
+      }
+    }
   }
 
   /**
    * Flush a region.
    * @param region Region to flush.
-   * @param emergencyFlush Set if we are being force flushed. If true the region
-   * needs to be removed from the flush queue. If false, when we were called
-   * from the main flusher run loop and we got the entry to flush by calling
-   * poll on the flush queue (which removed it).
    * @param selectiveFlushRequest Do we want to selectively flush only the
    * column families that dominate the memstore size?
    *
@@ -265,52 +274,25 @@ class MemStoreFlusher implements FlushRe
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final HRegion region, String why,
-      final boolean emergencyFlush, boolean selectiveFlushRequest) {
-
-    synchronized (this.regionsInQueue) {
-      FlushQueueEntry fqe = this.regionsInQueue.remove(region);
-      if (fqe != null && emergencyFlush) {
-        Future future = futures.get(fqe);
-        if (future != null) {
-          try {
-            future.get();
-            if (region.flushcache(selectiveFlushRequest)) {
-              server.compactSplitThread.requestCompaction(region, why);
-            }
-            server.getMetrics().addFlush(region.getRecentFlushInfo());
-          } catch (IOException ex) {
-            LOG.warn("Cache flush failed" +
-                            (region != null ? (" for region " +
-                                    Bytes.toString(region.getRegionName())) : ""),
-                    RemoteExceptionHandler.checkIOException(ex)
-            );
-            server.checkFileSystem();
-            return false;
-          } catch (InterruptedException e) {
-            LOG.warn("Flush failed" +
-                    (region != null ? (" for region " +
-                            Bytes.toString(region.getRegionName())) : ""));
-          } catch (ExecutionException e) {
-            LOG.warn("Flush failed" +
-                    (region != null ? (" for region " +
-                            Bytes.toString(region.getRegionName())) : ""));
-          } finally {
-            lock.readLock().unlock();
-          }
-        }
+  private boolean flushRegionNow(HRegionIf region, String why,
+      boolean selectiveFlushRequest) {
+    try {
+      boolean res = region.flushMemstoreShapshot(selectiveFlushRequest);
+      if (res) {
+        server.requestCompaction(region, why);
       }
+      server.getMetrics().addFlush(region.getRecentFlushInfo());
+      return res;
+    } catch (IOException ex) {
+      LOG.warn(
+          "Cache flush failed"
+          + (region != null
+            ? (" for region " + region.getRegionInfo().getRegionNameAsString())
+            : ""),
+          RemoteExceptionHandler.checkIOException(ex));
+      server.checkFileSystem();
+      return false;
     }
-    return true;
-  }
-
-  private boolean isTooManyStoreFiles(HRegion region) {
-    for (Store hstore: region.stores.values()) {
-      if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
-        return true;
-      }
-    }
-    return false;
   }
 
   /**
@@ -325,6 +307,39 @@ class MemStoreFlusher implements FlushRe
     }
   }
 
+  /**
+   * Makes an emergency flush.
+   *
+   * If a flush request is found in the queue, these method will wait for that
+   * flush to be finished. Otherwise a flush will be performed in the current
+   * thread by calling to {@code #flushRegionNow(IRegion, String, boolean)}
+   */
+  private boolean doEmergencyFlush(HRegionIf region, String why,
+      boolean selectiveFlushRequest) {
+    Pair<FlushQueueEntry, Future<Boolean>> pair;
+    synchronized (regionsInQueue) {
+      pair = regionsInQueue.get(region);
+    }
+    if (pair != null) {
+      // Already has flush request, wait for its finish.
+      try {
+        return pair.getSecond().get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.info("Interrupted waiting for flushing of " + region, e);
+        return false;
+      } catch (ExecutionException e) {
+        // This should not happen actually, all exception should have been
+        // caught in Callable.
+        LOG.info("ExecutionException caught for flushing of " + region, e);
+        return false;
+      }
+    }
+
+    // Perform a flush in current thread
+    return flushRegionNow(region, why, selectiveFlushRequest);
+  }
+
   /*
    * Emergency!  Need to flush memory.
    */
@@ -332,14 +347,17 @@ class MemStoreFlusher implements FlushRe
     if (this.server.getGlobalMemstoreSize().get() < globalMemStoreLimit) {
       return; // double check the global memstore size inside of the synchronized block.
     }
-    
+
     // keep flushing until we hit the low water mark
     long globalMemStoreSize = -1;
     ArrayList<HRegion> regionsToCompact = new ArrayList<HRegion>();
-    for (SortedMap<Long, HRegion> m =
+    SortedMap<Long, HRegion> m =
         this.server.getCopyOfOnlineRegionsSortedBySize();
-      (globalMemStoreSize = this.server.getGlobalMemstoreSize().get()) >=
-        this.globalMemStoreLimitLowMark;) {
+    while (true) {
+      globalMemStoreSize = this.server.getGlobalMemstoreSize().get();
+      if (globalMemStoreSize < this.globalMemStoreLimitLowMark) {
+        break;
+      }
       // flush the region with the biggest memstore
       if (m.size() <= 0) {
         LOG.info("No online regions to flush though we've been asked flush " +
@@ -356,52 +374,50 @@ class MemStoreFlusher implements FlushRe
         " exceeded; currently " +
         StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
         StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-      if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true, false)) {
+      if (!doEmergencyFlush(biggestMemStoreRegion, "emergencyFlush", false)) {
         LOG.warn("Flush failed");
         break;
       }
       regionsToCompact.add(biggestMemStoreRegion);
     }
     for (HRegion region : regionsToCompact) {
-      server.compactSplitThread.requestCompaction(region, "emergencyFlush");
+      server.requestCompaction(region, "emergencyFlush");
     }
   }
 
   /**
-   * Datastructure used in the flush queue.  Holds region and retry count.
-   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
+   * Data structure used in the flush queue. Holds region and retry count.
+   * Keeps tabs on how old this object is. Implements {@link Delayed}. On
    * construction, the delay is zero. When added to a delay queue, we'll come
-   * out near immediately.  Call {@link #requeue(long)} passing delay in
-   * milliseconds before readding to delay queue if you want it to stay there
+   * out near immediately. Call {@link #requeue(long)} passing delay in
+   * milliseconds before reading to delay queue if you want it to stay there
    * a while.
    */
-  static class FlushQueueEntry implements Delayed {
-    private final HRegion region;
+  static class FlushQueueEntry {
+    private final HRegionIf region;
     private final long createTime;
-    private long whenToExpire;
     private int requeueCount = 0;
-    private boolean selectiveFlushRequest;
+    private boolean selective;
 
     /**
      * @param r The region to flush
-     * @param selectiveFlushRequest Do we want to flush only the column
+     * @param selective Do we want to flush only the column
      *                              families that dominate the memstore size,
      *                              i.e., do a selective flush? If we are
      *                              doing log rolling, then we should not do a
      *                              selective flush.
      */
-    FlushQueueEntry(final HRegion r, boolean selectiveFlushRequest) {
+    FlushQueueEntry(final HRegionIf r, boolean selective) {
       this.region = r;
       this.createTime = System.currentTimeMillis();
-      this.whenToExpire = this.createTime;
-      this.selectiveFlushRequest = selectiveFlushRequest;
+      this.selective = selective;
     }
 
     /**
      * @return Is this a request for a selective flush?
      */
-    public boolean isSelectiveFlushRequest() {
-      return selectiveFlushRequest;
+    public boolean selective() {
+      return selective;
     }
 
     /**
@@ -419,29 +435,21 @@ class MemStoreFlusher implements FlushRe
     public int getRequeueCount() {
       return this.requeueCount;
     }
- 
+
     /**
-     * @param when When to expire, when to come up out of the queue.
-     * Specify in milliseconds.  This method adds System.currentTimeMillis()
-     * to whatever you pass.
-     * @return This.
+     * Increases the requeue count.
+     *
+     * @return this.
      */
-    public FlushQueueEntry requeue(final long when) {
-      this.whenToExpire = System.currentTimeMillis() + when;
+    public FlushQueueEntry requeue() {
       this.requeueCount++;
       return this;
     }
 
     @Override
-    public long getDelay(TimeUnit unit) {
-      return unit.convert(this.whenToExpire - System.currentTimeMillis(),
-          TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public int compareTo(Delayed other) {
-      return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
-        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
+    public String toString() {
+      return "{regin: " + region + ", created: " + createTime + ", requeue: "
+          + requeueCount + ", selective: " + selective + "}";
     }
   }
 
@@ -451,12 +459,11 @@ class MemStoreFlusher implements FlushRe
     // number of "memstore flusher" threads per region server
     int handlerCount = newConf.getInt(HConstants.FLUSH_THREADS, HConstants.DEFAULT_FLUSH_THREADS);
     if(this.handlerCount != handlerCount){
-      LOG.info("Changing the value of " + HConstants.FLUSH_THREADS +
-              " from " + this.handlerCount + " to " +
-              handlerCount);
+      LOG.info("Changing the value of " + HConstants.FLUSH_THREADS + " from "
+          + this.handlerCount + " to " + handlerCount);
     }
-    this.flushes.setMaximumPoolSize(handlerCount);
-    this.flushes.setCorePoolSize(handlerCount);
+    this.threadPool.setMaximumPoolSize(handlerCount);
+    this.threadPool.setCorePoolSize(handlerCount);
     this.handlerCount = handlerCount;
   }
 
@@ -467,7 +474,35 @@ class MemStoreFlusher implements FlushRe
    * @return
    */
   protected int getFlushThreadNum() {
-    return this.flushes.getCorePoolSize();
+    return this.threadPool.getCorePoolSize();
   }
 
+  /**
+   * Waits for all current request to be done.
+   * Used only in testcases.
+   */
+  void waitAllRequestDone() throws ExecutionException, InterruptedException {
+    while (true) {
+      // Fetch futures.
+      List<Future<Boolean>> futures = new ArrayList<>();
+      synchronized (this.regionsInQueue) {
+        for (Pair<FlushQueueEntry, Future<Boolean>> pair :
+            regionsInQueue.values()) {
+          futures.add(pair.getSecond());
+        }
+      }
+
+      if (futures.size() == 0) {
+        // No more requests, quit
+        return;
+      }
+
+      // Wait for futures
+      for (Future<Boolean> future : futures) {
+        future.get();
+      }
+      // This is a loop because some new requests may be generated during
+      // executing current requests.
+    }
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java?rev=1586735&r1=1586734&r2=1586735&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java Fri Apr 11 18:18:26 2014
@@ -164,7 +164,7 @@ public class TestPerColumnFamilyFlush ex
                 (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
 
     // Flush!
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
 
     // Will use these to check if anything changed.
     long oldCF2MemstoreSize = cf2MemstoreSize;
@@ -205,7 +205,7 @@ public class TestPerColumnFamilyFlush ex
     oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
 
     // Flush again
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
 
     // Recalculate everything
     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
@@ -228,7 +228,7 @@ public class TestPerColumnFamilyFlush ex
     // In that case, we should flush all the CFs.
 
     // Clearing the existing memstores.
-    region.flushcache(false);
+    region.flushMemstoreShapshot(false);
 
     // The memstore limit is 200*1024 and the column family flush threshold is
     // around 50*1024. We try to just hit the memstore limit with each CF's
@@ -241,7 +241,7 @@ public class TestPerColumnFamilyFlush ex
       region.put(createPut(5, i));
     }
 
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
     // Since we won't find any CF above the threshold, and hence no specific
     // store to flush, we should flush all the memstores.
     Assert.assertEquals(0, region.getMemstoreSize().get());
@@ -254,7 +254,7 @@ public class TestPerColumnFamilyFlush ex
      *           FM1   FM2   FM3
      */
     // flush all
-    region.flushcache(false);
+    region.flushMemstoreShapshot(false);
     region.put(createPut(2, 1));
     region.put(createPut(3, 1));
 
@@ -279,7 +279,7 @@ public class TestPerColumnFamilyFlush ex
      *           FM1   FM2   FM3
      */
     // flush, should only flush family 1
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
 
     final long sizeFamily1_2 = region.getStore(FAMILY1).getMemStoreSize();
     final long sizeFamily2_2 = region.getStore(FAMILY2).getMemStoreSize();
@@ -295,7 +295,7 @@ public class TestPerColumnFamilyFlush ex
      *           FM1   FM2   FM3
      */
     // flush all
-    region.flushcache(false);
+    region.flushMemstoreShapshot(false);
 
     region.put(createPut(2, 1));
     region.put(createPut(3, 1));
@@ -318,7 +318,7 @@ public class TestPerColumnFamilyFlush ex
      *           FM1   FM2   FM3
      */
     // flush, should flush all
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
 
     final long sizeFamily1_4 = region.getStore(FAMILY1).getMemStoreSize();
     final long sizeFamily2_4 = region.getStore(FAMILY2).getMemStoreSize();
@@ -417,7 +417,7 @@ public class TestPerColumnFamilyFlush ex
             (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
 
     // Flush!
-    region.flushcache(true);
+    region.flushMemstoreShapshot(true);
 
     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
@@ -501,7 +501,7 @@ public class TestPerColumnFamilyFlush ex
       desiredRegion != null);
 
     // Flush the region selectively.
-    desiredRegion.flushcache(true);
+    desiredRegion.flushMemstoreShapshot(true);
 
     long totalMemstoreSize;
     long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java?rev=1586735&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java Fri Apr 11 18:18:26 2014
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.StringBytes;
+import org.junit.Test;
+
+/**
+ * Testcases for MemStoreFlusher
+ */
+public class TestMemStoreFlusher {
+  private static class HRegionServerMock implements HRegionServerIf {
+    RegionServerMetrics metrics;
+    public HRegionServerMock(Configuration conf) {
+      metrics = new RegionServerMetrics(conf);
+    }
+
+    AtomicLong globalMemstoreSize = new AtomicLong(0);
+    @Override
+    public String getRSThreadName() {
+      return "RSThread";
+    }
+
+    @Override
+    public void checkFileSystem() {
+    }
+
+    @Override
+    public boolean requestSplit(HRegionIf r) {
+      return false;
+    }
+
+    @Override
+    public RegionServerMetrics getMetrics() {
+      return metrics;
+    }
+
+    @Override
+    public AtomicLong getGlobalMemstoreSize() {
+      return globalMemstoreSize;
+    }
+
+    @Override
+    public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
+      return null;
+    }
+
+    @Override
+    public void requestCompaction(HRegionIf r, String why) {
+    }
+
+  }
+
+  private static class HRegionMock implements HRegionIf {
+    private final HRegionInfo info;
+    final AtomicInteger flushedCount = new AtomicInteger();
+    /**
+     * The number of checking maxStoreFilesCount times
+     */
+    final AtomicInteger checkingCount = new AtomicInteger();
+    final int maxStoreFilesCount;
+
+    public HRegionMock(StringBytes tableName, int maxStoreFilesCount) {
+      info = new HRegionInfo(new HTableDescriptor(tableName.getBytes()),
+          null, null);
+      this.maxStoreFilesCount = maxStoreFilesCount;
+    }
+
+    @Override
+    public boolean flushMemstoreShapshot(boolean selectiveFlushRequest)
+        throws IOException {
+      flushedCount.addAndGet(1);
+      return true;
+    }
+
+    @Override
+    public List<Pair<Long, Long>> getRecentFlushInfo() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+      return info;
+    }
+
+    @Override
+    public boolean hasReferences() {
+      return false;
+    }
+
+    @Override
+    public int maxStoreFilesCount() {
+      checkingCount.addAndGet(1);
+      return maxStoreFilesCount;
+    }
+  }
+
+  @Test
+  public void testFlush() throws Exception {
+    final Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1");
+
+    HRegionServerMock server = new HRegionServerMock(conf);
+    MemStoreFlusher flusher = new MemStoreFlusher(conf, server);
+
+    HRegionMock region = new HRegionMock(new StringBytes("testFlush"), 1);
+
+    // Make a flush request
+    flusher.request(region, false);
+
+    flusher.waitAllRequestDone();
+    Assert.assertEquals("Number of flushing", 1, region.flushedCount.get());
+  }
+
+  /**
+   * In MemStoreFlusher, if there are too many store files, a delay should be
+   * performed waiting for some files to be merged. This case assure this delay
+   * is performed.
+   */
+  @Test
+  public void testDelay() throws Exception {
+    final long blockingWaitTime = 2000;
+
+    final Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.HSTORE_BLOCKING_STORE_FILES_KEY, "1");
+    conf.set(HConstants.HSTORE_BLOCKING_WAIT_TIME_KEY, "" + blockingWaitTime);
+
+    HRegionServerMock server = new HRegionServerMock(conf);
+    MemStoreFlusher flusher = new MemStoreFlusher(conf, server);
+
+    // 3 is larger than 1
+    HRegionMock region = new HRegionMock(new StringBytes("testDelay"), 3);
+
+    // Make a flush request
+    flusher.request(region, false);
+
+    flusher.waitAllRequestDone();
+
+    Assert.assertEquals("Number of flushing", 1, region.flushedCount.get());
+    Assert.assertTrue("Number of checking should <= " + 100 + ", but got "
+        + region.checkingCount.get(), region.checkingCount.get() <= 100);
+  }
+}