You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/06/26 05:03:29 UTC

[hbase] branch master updated: HBASE-24382 Flush partial stores of region filtered by seqId when arc… (#1737)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new c046120  HBASE-24382 Flush partial stores of region filtered by seqId when arc… (#1737)
c046120 is described below

commit c0461207eec122103067527d3dd5090741141a0c
Author: bsglz <18...@qq.com>
AuthorDate: Fri Jun 26 13:03:18 2020 +0800

    HBASE-24382 Flush partial stores of region filtered by seqId when arc… (#1737)
    
    * HBASE-24382 Flush partial stores of region filtered by seqId when archive wal due to too many wals
    
    * fix checkstyle and javadoc issue
    
    * fix javadoc issues
    
    * move the geting of stores to HRegion, since it should not be part of FlushPolicy, and comment fix
    
    * fix checkstyle issue
    
    * add some comment
    
    * remove the forceFlushAllStores since we can use families to determine how to select stores to flush
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: stack <st...@duboce.net>
---
 .../hbase/master/region/MasterRegionWALRoller.java |  7 +-
 .../hadoop/hbase/regionserver/FlushPolicy.java     |  1 -
 .../hadoop/hbase/regionserver/FlushRequester.java  | 20 +++--
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 48 +++++++++---
 .../hadoop/hbase/regionserver/HRegionServer.java   |  2 +-
 .../hadoop/hbase/regionserver/LogRoller.java       |  8 +-
 .../hadoop/hbase/regionserver/MemStoreFlusher.java | 41 +++++------
 .../hbase/regionserver/wal/AbstractFSWAL.java      | 30 +++++---
 .../regionserver/wal/SequenceIdAccounting.java     | 22 +++---
 .../apache/hadoop/hbase/wal/AbstractWALRoller.java | 16 ++--
 .../hadoop/hbase/wal/DisabledWALProvider.java      |  4 +-
 .../main/java/org/apache/hadoop/hbase/wal/WAL.java |  9 ++-
 .../regionserver/TestFailedAppendAndSync.java      |  7 +-
 .../hbase/regionserver/TestFlushRegionEntry.java   |  4 +-
 .../hbase/regionserver/TestHeapMemoryManager.java  | 85 ++++++++++++----------
 .../hbase/regionserver/TestSplitWalDataLoss.java   |  2 +-
 .../hbase/regionserver/wal/AbstractTestFSWAL.java  | 76 ++++++++++++++-----
 .../regionserver/wal/AbstractTestWALReplay.java    | 12 ++-
 .../hadoop/hbase/regionserver/wal/TestFSHLog.java  |  2 +-
 .../regionserver/wal/TestSequenceIdAccounting.java |  2 +-
 20 files changed, 251 insertions(+), 147 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
index e18aa0c..ef3dd12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegionWALRoller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.region;
 import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
 
 import java.io.IOException;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,8 +40,8 @@ import org.slf4j.LoggerFactory;
  * roller logic by our own.
  * <p/>
  * We can reuse most of the code for normal wal roller, the only difference is that there is only
- * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the master
- * local region.
+ * one region, so in {@link #scheduleFlush(String, List)} method we can just schedule flush
+ * for the master local region.
  */
 @InterfaceAudience.Private
 public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
@@ -79,7 +80,7 @@ public final class MasterRegionWALRoller extends AbstractWALRoller<Abortable> {
   }
 
   @Override
-  protected void scheduleFlush(String encodedRegionName) {
+  protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
     MasterRegionFlusherAndCompactor flusher = this.flusherAndCompactor;
     if (flusher != null) {
       flusher.requestFlush();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
index fecbd2f..66bd095 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -45,5 +45,4 @@ public abstract class FlushPolicy extends Configured {
    * @return the stores need to be flushed.
    */
   public abstract Collection<HStore> selectStoresToFlush();
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index 4191fbf..92aed2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.List;
+
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -30,22 +32,28 @@ public interface FlushRequester {
    * Tell the listener the cache needs to be flushed.
    *
    * @param region the Region requesting the cache flush
-   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
-   *          rolling.
    * @return true if our region is added into the queue, false otherwise
    */
-  boolean requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
+  boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker);
+
+  /**
+   * Tell the listener the cache needs to be flushed.
+   *
+   * @param region the Region requesting the cache flush
+   * @param families stores of region to flush, if null then use flush policy
+   * @return true if our region is added into the queue, false otherwise
+   */
+  boolean requestFlush(HRegion region, List<byte[]> families,
+    FlushLifeCycleTracker tracker);
 
   /**
    * Tell the listener the cache needs to be flushed after a delay
    *
    * @param region the Region requesting the cache flush
    * @param delay after how much time should the flush happen
-   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
-   *          rolling.
    * @return true if our region is added into the queue, false otherwise
    */
-  boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
+  boolean requestDelayedFlush(HRegion region, long delay);
 
   /**
    * Register a FlushRequestListener
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 20d78a7..dd5d6c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2353,7 +2353,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    *
    * <p>This method may block for some time, so it should not be called from a
    * time-sensitive thread.
-   * @param force whether we want to force a flush of all stores
+   * @param flushAllStores whether we want to force a flush of all stores
    * @return FlushResult indicating whether the flush was successful or not and if
    * the region needs compacting
    *
@@ -2361,8 +2361,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * because a snapshot was not properly persisted.
    */
   // TODO HBASE-18905. We might have to expose a requestFlush API for CPs
-  public FlushResult flush(boolean force) throws IOException {
-    return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
+  public FlushResult flush(boolean flushAllStores) throws IOException {
+    return flushcache(flushAllStores, false, FlushLifeCycleTracker.DUMMY);
   }
 
   public interface FlushResult {
@@ -2385,6 +2385,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     boolean isCompactionNeeded();
   }
 
+  public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
+    FlushLifeCycleTracker tracker) throws IOException {
+    List families = null;
+    if (flushAllStores) {
+      families = new ArrayList();
+      families.addAll(this.getTableDescriptor().getColumnFamilyNames());
+    }
+    return this.flushcache(families, writeFlushRequestWalMarker, tracker);
+  }
+
   /**
    * Flush the cache.
    *
@@ -2398,7 +2408,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    *
    * <p>This method may block for some time, so it should not be called from a
    * time-sensitive thread.
-   * @param forceFlushAllStores whether we want to flush all stores
+   * @param families stores of region to flush.
    * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
    * @param tracker used to track the life cycle of this flush
    * @return whether the flush is success and whether the region needs compacting
@@ -2408,8 +2418,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * because a Snapshot was not properly persisted. The region is put in closing mode, and the
    * caller MUST abort after this.
    */
-  public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
-      FlushLifeCycleTracker tracker) throws IOException {
+  public FlushResultImpl flushcache(List<byte[]> families,
+      boolean writeFlushRequestWalMarker, FlushLifeCycleTracker tracker) throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
       String msg = "Skipping flush on " + this + " because closing";
@@ -2456,8 +2466,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       try {
-        Collection<HStore> specificStoresToFlush =
-            forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
+        // The reason that we do not always use flushPolicy is, when the flush is
+        // caused by logRoller, we should select stores which must be flushed
+        // rather than could be flushed.
+        Collection<HStore> specificStoresToFlush = null;
+        if (families != null) {
+          specificStoresToFlush = getSpecificStores(families);
+        } else {
+          specificStoresToFlush = flushPolicy.selectStoresToFlush();
+        }
         FlushResultImpl fs =
             internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
 
@@ -2488,6 +2505,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
+   * get stores which matches the specified families
+   *
+   * @return the stores need to be flushed.
+   */
+  private Collection<HStore> getSpecificStores(List<byte[]> families) {
+    Collection<HStore> specificStoresToFlush = new ArrayList<>();
+    for (byte[] family : families) {
+      specificStoresToFlush.add(stores.get(family));
+    }
+    return specificStoresToFlush;
+  }
+
+  /**
    * Should the store be flushed because it is old enough.
    * <p>
    * Every FlushPolicy should call this to determine whether a store is old enough to flush (except
@@ -8962,7 +8992,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
     if (shouldFlush) {
       // Make request outside of synchronize block; HBASE-818.
-      this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
+      this.rsServices.getFlushRequester().requestFlush(this, tracker);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5dde649..43fd908 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1840,7 +1840,7 @@ public class HRegionServer extends Thread implements
             //Throttle the flushes by putting a delay. If we don't throttle, and there
             //is a balanced write-load on the regions in a table, we might end up
             //overwhelming the filesystem with too many flushes at once.
-            if (requester.requestDelayedFlush(r, randomDelay, false)) {
+            if (requester.requestDelayedFlush(r, randomDelay)) {
               LOG.info("{} requesting flush of {} because {} after random delay {} ms",
                   getName(), r.getRegionInfo().getRegionNameAsString(),  whyFlush.toString(),
                   randomDelay);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f5049c9..58ac82e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.List;
 import java.util.Map;
+
 import org.apache.hadoop.hbase.wal.AbstractWALRoller;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,7 +47,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
     super("LogRoller", services.getConfiguration(), services);
   }
 
-  protected void scheduleFlush(String encodedRegionName) {
+  protected void scheduleFlush(String encodedRegionName, List<byte[]> families) {
     RegionServerServices services = this.abortable;
     HRegion r = (HRegion) services.getRegion(encodedRegionName);
     if (r == null) {
@@ -58,8 +60,8 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
         encodedRegionName, r);
       return;
     }
-    // force flushing all stores to clean old logs
-    requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
+    // flush specified stores to clean old logs
+    requester.requestFlush(r, families, FlushLifeCycleTracker.DUMMY);
   }
 
   @VisibleForTesting
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index e191f04..c133a7a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -286,7 +286,7 @@ class MemStoreFlusher implements FlushRequester {
                 server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(), "", 1) +
             ", Region memstore size=" +
             TraditionalBinaryPrefix.long2String(regionToFlushSize, "", 1));
-        flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
+        flushedOne = flushRegion(regionToFlush, true, null, FlushLifeCycleTracker.DUMMY);
 
         if (!flushedOne) {
           LOG.info("Excluding unflushable region " + regionToFlush +
@@ -458,13 +458,18 @@ class MemStoreFlusher implements FlushRequester {
   }
 
   @Override
-  public boolean requestFlush(HRegion r, boolean forceFlushAllStores,
-                              FlushLifeCycleTracker tracker) {
+  public boolean requestFlush(HRegion r, FlushLifeCycleTracker tracker) {
+    return this.requestFlush(r, null, tracker);
+  }
+
+  @Override
+  public boolean requestFlush(HRegion r, List<byte[]> families,
+      FlushLifeCycleTracker tracker) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue. It'll come out near immediately.
-        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
+        FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
         r.incrementFlushesQueuedCount();
@@ -477,12 +482,12 @@ class MemStoreFlusher implements FlushRequester {
   }
 
   @Override
-  public boolean requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
+  public boolean requestDelayedFlush(HRegion r, long delay) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay
         FlushRegionEntry fqe =
-            new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
+            new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
         fqe.requeue(delay);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
@@ -581,7 +586,7 @@ class MemStoreFlusher implements FlushRequester {
         return true;
       }
     }
-    return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
+    return flushRegion(region, false, fqe.families, fqe.getTracker());
   }
 
   /**
@@ -591,13 +596,13 @@ class MemStoreFlusher implements FlushRequester {
    * needs to be removed from the flush queue. If false, when we were called
    * from the main flusher run loop and we got the entry to flush by calling
    * poll on the flush queue (which removed it).
-   * @param forceFlushAllStores whether we want to flush all store.
+   * @param families stores of region to flush.
    * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the region was
    * not flushed.
    */
-  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
-      FlushLifeCycleTracker tracker) {
+  private boolean flushRegion(HRegion region, boolean emergencyFlush,
+      List<byte[]> families, FlushLifeCycleTracker tracker) {
     synchronized (this.regionsInQueue) {
       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
       // Use the start time of the FlushRegionEntry if available
@@ -612,7 +617,7 @@ class MemStoreFlusher implements FlushRequester {
     lock.readLock().lock();
     try {
       notifyFlushRequest(region, emergencyFlush);
-      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
+      FlushResult flushResult = region.flushcache(families, false, tracker);
       boolean shouldCompact = flushResult.isCompactionNeeded();
       // We just want to check the size
       boolean shouldSplit = region.checkSplit() != null;
@@ -845,15 +850,16 @@ class MemStoreFlusher implements FlushRequester {
     private long whenToExpire;
     private int requeueCount = 0;
 
-    private final boolean forceFlushAllStores;
+    private final List<byte[]> families;
 
     private final FlushLifeCycleTracker tracker;
 
-    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
+    FlushRegionEntry(final HRegion r, List<byte[]> families,
+        FlushLifeCycleTracker tracker) {
       this.region = r;
       this.createTime = EnvironmentEdgeManager.currentTime();
       this.whenToExpire = this.createTime;
-      this.forceFlushAllStores = forceFlushAllStores;
+      this.families = families;
       this.tracker = tracker;
     }
 
@@ -873,13 +879,6 @@ class MemStoreFlusher implements FlushRequester {
       return this.requeueCount;
     }
 
-    /**
-     * @return whether we need to flush all stores.
-     */
-    public boolean isForceFlushAllStores() {
-      return forceFlushAllStores;
-    }
-
     public FlushLifeCycleTracker getTracker() {
       return tracker;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 81e9fdb..919e3b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -545,7 +546,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   }
 
   @Override
-  public byte[][] rollWriter() throws FailedLogCloseException, IOException {
+  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
     return rollWriter(false);
   }
 
@@ -640,10 +641,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
    * check the first (oldest) WAL, and return those regions which should be flushed so that
    * it can be let-go/'archived'.
-   * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
+   * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file.
    */
-  byte[][] findRegionsToForceFlush() throws IOException {
-    byte[][] regions = null;
+  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
+    Map<byte[], List<byte[]>> regions = null;
     int logCount = getNumRolledLogFiles();
     if (logCount > this.maxLogs && logCount > 0) {
       Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
@@ -651,15 +652,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
         this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
     }
     if (regions != null) {
-      StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < regions.length; i++) {
-        if (i > 0) {
-          sb.append(", ");
+      List<String> listForPrint = new ArrayList();
+      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
+        StringBuilder families = new StringBuilder();
+        for (int i = 0; i < r.getValue().size(); i++) {
+          if (i > 0) {
+            families.append(",");
+          }
+          families.append(Bytes.toString(r.getValue().get(i)));
         }
-        sb.append(Bytes.toStringBinary(regions[i]));
+        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
       }
       LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
-        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
+        "; forcing (partial) flush of " + regions.size() + " region(s): " +
+        StringUtils.join(",", listForPrint));
     }
     return regions;
   }
@@ -821,7 +827,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
   }
 
   @Override
-  public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
     rollWriterLock.lock();
     try {
       if (this.closed) {
@@ -831,7 +837,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       if (!force && this.writer != null && this.numEntries.get() <= 0) {
         return null;
       }
-      byte[][] regionsToFlush = null;
+      Map<byte[], List<byte[]>> regionsToFlush = null;
       try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
         Path oldPath = getOldPath();
         Path newPath = getNewPath();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index 7e667ce..986a10f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
@@ -440,10 +441,10 @@ class SequenceIdAccounting {
    * {@link #lowestUnflushedSequenceIds} has a sequence id less than that passed in
    * <code>sequenceids</code> then return it.
    * @param sequenceids Sequenceids keyed by encoded region name.
-   * @return regions found in this instance with sequence ids less than those passed in.
+   * @return stores of regions found in this instance with sequence ids less than those passed in.
    */
-  byte[][] findLower(Map<byte[], Long> sequenceids) {
-    List<byte[]> toFlush = null;
+  Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) {
+    Map<byte[], List<byte[]>> toFlush = null;
     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
     synchronized (tieLock) {
       for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
@@ -451,16 +452,17 @@ class SequenceIdAccounting {
         if (m == null) {
           continue;
         }
-        // The lowest sequence id outstanding for this region.
-        long lowest = getLowestSequenceId(m);
-        if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
-          if (toFlush == null) {
-            toFlush = new ArrayList<>();
+        for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) {
+          if (me.getValue() <= e.getValue()) {
+            if (toFlush == null) {
+              toFlush = new TreeMap(Bytes.BYTES_COMPARATOR);
+            }
+            toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
+              .add(Bytes.toBytes(me.getKey().toString()));
           }
-          toFlush.add(e.getKey());
         }
       }
     }
-    return toFlush == null ? null : toFlush.toArray(new byte[0][]);
+    return toFlush;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index d668cf1..d2b6717 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -23,6 +23,7 @@ import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -45,8 +46,8 @@ import org.slf4j.LoggerFactory;
  * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
  * there is something to do, rather than the Chore sleep time which is invariant.
  * <p/>
- * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
- * region server but we still want to roll its WAL.
+ * The {@link #scheduleFlush(String, List)} is abstract here,
+ * as sometimes we hold a region without a region server but we still want to roll its WAL.
  * <p/>
  * TODO: change to a pool of threads
  */
@@ -180,18 +181,18 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
           WAL wal = entry.getKey();
           // reset the flag in front to avoid missing roll request before we return from rollWriter.
           walNeedsRoll.put(wal, Boolean.FALSE);
-          byte[][] regionsToFlush = null;
+          Map<byte[], List<byte[]>> regionsToFlush = null;
           try {
             // Force the roll if the logroll.period is elapsed or if a roll was requested.
-            // The returned value is an array of actual region names.
+            // The returned value is an collection of actual region and family names.
             regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
           } catch (WALClosedException e) {
             LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
             iter.remove();
           }
           if (regionsToFlush != null) {
-            for (byte[] r : regionsToFlush) {
-              scheduleFlush(Bytes.toString(r));
+            for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
+              scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
             }
           }
           afterRoll(wal);
@@ -218,8 +219,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
 
   /**
    * @param encodedRegionName Encoded name of region to flush.
+   * @param families stores of region to flush.
    */
-  protected abstract void scheduleFlush(String encodedRegionName);
+  protected abstract void scheduleFlush(String encodedRegionName, List<byte[]> families);
 
   private boolean isWaiting() {
     Thread.State state = getState();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 98773c2..dbc08cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -115,7 +115,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public byte[][] rollWriter() {
+    public Map<byte[], List<byte[]>> rollWriter() {
       if (!listeners.isEmpty()) {
         for (WALActionsListener listener : listeners) {
           listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
@@ -139,7 +139,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public byte[][] rollWriter(boolean force) {
+    public Map<byte[], List<byte[]>> rollWriter(boolean force) {
       return rollWriter();
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d57784f..902ca6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hbase.HConstants;
@@ -60,11 +61,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
    * The implementation is synchronized in order to make sure there's one rollWriter
    * running at any given time.
    *
-   * @return If lots of logs, flush the returned regions so next time through we
+   * @return If lots of logs, flush the stores of returned regions so next time through we
    *         can clean logs. Returns null if nothing to flush. Names are actual
    *         region names as returned by {@link RegionInfo#getEncodedName()}
    */
-  byte[][] rollWriter() throws FailedLogCloseException, IOException;
+  Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException;
 
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
@@ -76,11 +77,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
    * @param force
    *          If true, force creation of a new writer even if no entries have
    *          been written to the current writer
-   * @return If lots of logs, flush the returned regions so next time through we
+   * @return If lots of logs, flush the stores of returned regions so next time through we
    *         can clean logs. Returns null if nothing to flush. Names are actual
    *         region names as returned by {@link RegionInfo#getEncodedName()}
    */
-  byte[][] rollWriter(boolean force) throws IOException;
+  Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException;
 
   /**
    * Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index c7145e3..464afad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -113,8 +115,9 @@ public class TestFailedAppendAndSync {
     }
 
     @Override
-    public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
-      byte[][] regions = super.rollWriter(force);
+    public Map<byte[], List<byte[]>> rollWriter(boolean force)
+        throws FailedLogCloseException, IOException {
+      Map<byte[], List<byte[]>> regions = super.rollWriter(force);
       rolls.getAndIncrement();
       return regions;
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index e91ff12..d273501 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -66,8 +66,8 @@ public class TestFlushRegionEntry {
     HRegion r = mock(HRegion.class);
     doReturn(hri).when(r).getRegionInfo();
 
-    FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
-    FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
+    FlushRegionEntry entry = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
+    FlushRegionEntry other = new FlushRegionEntry(r, null, FlushLifeCycleTracker.DUMMY);
 
     assertEquals(entry.hashCode(), other.hashCode());
     assertEquals(entry, other);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index eba138a..622a09f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Iterator;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ChoreService;
@@ -139,11 +141,11 @@ public class TestHeapMemoryManager {
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     Thread.sleep(1500);
     // No changes should be made by tuner as we already have lot of empty space
@@ -182,10 +184,10 @@ public class TestHeapMemoryManager {
     // do some offheap flushes also. So there should be decrease in memstore but
     // not as that when we don't have offheap flushes
     memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -230,10 +232,10 @@ public class TestHeapMemoryManager {
     // do some offheap flushes also. So there should be decrease in memstore but
     // not as that when we don't have offheap flushes
     memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -246,10 +248,10 @@ public class TestHeapMemoryManager {
     // flushes are due to onheap overhead. This should once again call for increase in
     // memstore size but that increase should be to the safe size
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@@ -312,10 +314,10 @@ public class TestHeapMemoryManager {
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -326,8 +328,8 @@ public class TestHeapMemoryManager {
     oldBlockCacheSize = blockCache.maxSize;
     // Do some more flushes before the next run of HeapMemoryTuner
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -361,10 +363,10 @@ public class TestHeapMemoryManager {
     heapMemoryManager.start(choreService);
     // this should not change anything with onheap memstore
     memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     Thread.sleep(1500);
     // No changes should be made by tuner as we already have lot of empty space
@@ -448,9 +450,9 @@ public class TestHeapMemoryManager {
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     blockCache.evictBlock(null);
     // Allow the tuner to run once and do necessary memory up
     Thread.sleep(1500);
@@ -459,9 +461,9 @@ public class TestHeapMemoryManager {
     assertEquals(oldBlockCacheSize, blockCache.maxSize);
     // Do some more flushes before the next run of HeapMemoryTuner
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     // Allow the tuner to run once and do necessary memory up
     waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@@ -494,9 +496,9 @@ public class TestHeapMemoryManager {
     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
     heapMemoryManager.start(choreService);
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     blockCache.evictBlock(null);
     blockCache.evictBlock(null);
     // Allow the tuner to run once and do necessary memory up
@@ -506,7 +508,7 @@ public class TestHeapMemoryManager {
     assertEquals(oldBlockCacheSize, blockCache.maxSize);
     // Flushes that block updates
     memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
+    memStoreFlusher.requestFlush(null, FlushLifeCycleTracker.DUMMY);
     blockCache.evictBlock(null);
     blockCache.evictBlock(null);
     blockCache.evictBlock(null);
@@ -752,14 +754,19 @@ public class TestHeapMemoryManager {
     }
 
     @Override
-    public boolean requestFlush(HRegion region, boolean forceFlushAllStores,
-        FlushLifeCycleTracker tracker) {
+    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
       this.listener.flushRequested(flushType, region);
       return true;
     }
 
     @Override
-    public boolean requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
+    public boolean requestFlush(HRegion region, List<byte[]> families,
+        FlushLifeCycleTracker tracker) {
+      return true;
+    }
+
+    @Override
+    public boolean requestDelayedFlush(HRegion region, long delay) {
       return true;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
index 0de5cb0..9cd0ec2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java
@@ -141,7 +141,7 @@ public class TestSplitWalDataLoss {
     long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
     LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
     assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
-    rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
+    rs.getMemStoreFlusher().requestFlush(spiedRegion, FlushLifeCycleTracker.DUMMY);
     synchronized (flushed) {
       while (!flushed.booleanValue()) {
         flushed.wait();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index 75dcad1..7c491dc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -168,9 +170,9 @@ public abstract class AbstractTestFSWAL {
   }
 
   protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
-      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
+      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes, String cf)
       throws IOException {
-    final byte[] row = Bytes.toBytes("row");
+    final byte[] row = Bytes.toBytes(cf);
     for (int i = 0; i < times; i++) {
       long timestamp = System.currentTimeMillis();
       WALEdit cols = new WALEdit();
@@ -252,8 +254,8 @@ public abstract class AbstractTestFSWAL {
    * regions which should be flushed in order to archive the oldest wal file.
    * <p>
    * This method tests this behavior by inserting edits and rolling the wal enough times to reach
-   * the max number of logs threshold. It checks whether we get the "right regions" for flush on
-   * rolling the wal.
+   * the max number of logs threshold. It checks whether we get the "right regions and stores" for
+   * flush on rolling the wal.
    * @throws Exception
    */
   @Test
@@ -263,12 +265,23 @@ public abstract class AbstractTestFSWAL {
     conf1.setInt("hbase.regionserver.maxlogs", 1);
     AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
       HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+    String cf1 = "cf1";
+    String cf2 = "cf2";
+    String cf3 = "cf3";
     TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
     TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf1)).build();
     RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
     RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
+
+    List<ColumnFamilyDescriptor> cfs = new ArrayList();
+    cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
+    cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
+    TableDescriptor t3 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t3"))
+      .setColumnFamilies(cfs).build();
+    RegionInfo hri3 = RegionInfoBuilder.newBuilder(t3.getTableName()).build();
+
     // add edits and roll the wal
     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -279,26 +292,30 @@ public abstract class AbstractTestFSWAL {
     for (byte[] fam : t2.getColumnFamilyNames()) {
       scopes2.put(fam, 0);
     }
+    NavigableMap<byte[], Integer> scopes3 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : t3.getColumnFamilyNames()) {
+      scopes3.put(fam, 0);
+    }
     try {
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
       wal.rollWriter();
       // add some more edits and roll the wal. This would reach the log number threshold
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
       wal.rollWriter();
       // with above rollWriter call, the max logs limit is reached.
       assertTrue(wal.getNumRolledLogFiles() == 2);
 
       // get the regions to flush; since there is only one region in the oldest wal, it should
       // return only one region.
-      byte[][] regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(1, regionsToFlush.length);
-      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
+      assertEquals(1, regionsToFlush.size());
+      assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
       // insert edits in second region
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
       // get the regions to flush, it should still read region1.
       regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(1, regionsToFlush.length);
-      assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+      assertEquals(1, regionsToFlush.size());
+      assertEquals(hri1.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
       // remain.
       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
@@ -311,29 +328,50 @@ public abstract class AbstractTestFSWAL {
       // no wal should remain now.
       assertEquals(0, wal.getNumRolledLogFiles());
       // add edits both to region 1 and region 2, and roll.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
-      addEdits(wal, hri2, t2, 2, mvcc, scopes2);
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
+      addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
       wal.rollWriter();
       // add edits and roll the writer, to reach the max logs limit.
       assertEquals(1, wal.getNumRolledLogFiles());
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
       wal.rollWriter();
       // it should return two regions to flush, as the oldest wal file has entries
       // for both regions.
       regionsToFlush = wal.findRegionsToForceFlush();
-      assertEquals(2, regionsToFlush.length);
+      assertEquals(2, regionsToFlush.size());
       // flush both regions
       flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
       flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
       wal.rollWriter(true);
       assertEquals(0, wal.getNumRolledLogFiles());
       // Add an edit to region1, and roll the wal.
-      addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+      addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
       wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
       wal.rollWriter();
       wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
       assertEquals(1, wal.getNumRolledLogFiles());
+
+      // clear test data
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
+      wal.rollWriter(true);
+      // add edits for three familes
+      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
+      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
+      wal.rollWriter();
+      addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
+      wal.rollWriter();
+      assertEquals(2, wal.getNumRolledLogFiles());
+      // flush one family before archive oldest wal
+      Set<byte[]> flushedFamilyNames = new HashSet<>();
+      flushedFamilyNames.add(Bytes.toBytes(cf1));
+      flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
+      regionsToFlush = wal.findRegionsToForceFlush();
+      // then only two family need to be flushed when archive oldest wal
+      assertEquals(1, regionsToFlush.size());
+      assertEquals(hri3.getEncodedNameAsBytes(), (byte[])regionsToFlush.keySet().toArray()[0]);
+      assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
     } finally {
       if (wal != null) {
         wal.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 59f1ee3..e3fc8e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -1116,9 +1116,9 @@ public abstract class AbstractTestWALReplay {
     private HRegion r;
 
     @Override
-    public boolean requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
+    public boolean requestFlush(HRegion region, FlushLifeCycleTracker tracker) {
       try {
-        r.flush(force);
+        r.flush(false);
         return true;
       } catch (IOException e) {
         throw new RuntimeException("Exception flushing", e);
@@ -1126,7 +1126,13 @@ public abstract class AbstractTestWALReplay {
     }
 
     @Override
-    public boolean requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
+    public boolean requestFlush(HRegion region, List<byte[]> families,
+        FlushLifeCycleTracker tracker) {
+      return true;
+    }
+
+    @Override
+    public boolean requestDelayedFlush(HRegion region, long when) {
       return true;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 87bec9a..d1a5a0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -124,7 +124,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
       RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
       MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
       for (int i = 0; i < 10; i++) {
-        addEdits(log, hri, htd, 1, mvcc, scopes);
+        addEdits(log, hri, htd, 1, mvcc, scopes, "row");
       }
     } finally {
       log.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
index ff20ab5..22b24ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java
@@ -137,7 +137,7 @@ public class TestSequenceIdAccounting {
     sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
     assertTrue(sida.findLower(m) == null);
     m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
-    assertTrue(sida.findLower(m).length == 1);
+    assertTrue(sida.findLower(m).size() == 1);
     m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
     assertTrue(sida.findLower(m) == null);
   }