You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/16 20:50:56 UTC

[1/2] hbase git commit: HBASE-10201 Port 'Make flush decisions per column family' to trunk

Repository: hbase
Updated Branches:
  refs/heads/master a411227b0 -> c7fad665f


http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index ced3383..d2ba69d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -31,10 +33,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -50,7 +54,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -64,15 +67,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -80,6 +75,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.htrace.NullScope;
@@ -88,6 +90,7 @@ import org.htrace.Trace;
 import org.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
@@ -333,33 +336,35 @@ public class FSHLog implements WAL {
   // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
   // done above in failedSequence, highest sequence, etc.
   /**
-   * This lock ties all operations on oldestFlushingRegionSequenceIds and
-   * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into
-   * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or
-   * to find regions  with old sequence ids to force flush; we are interested in old stuff not the
-   * new additions (TODO: IS THIS SAFE?  CHECK!).
+   * This lock ties all operations on lowestFlushingStoreSequenceIds and
+   * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
+   * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
+   * sequence id, or to find regions with old sequence ids to force flush; we are interested in
+   * old stuff not the new additions (TODO: IS THIS SAFE?  CHECK!).
    */
   private final Object regionSequenceIdLock = new Object();
 
   /**
-   * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived --
-   * sequence id in memstore. Note that this sequence id is the region sequence id.  This is not
-   * related to the id we use above for {@link #highestSyncedSequence} and
-   * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer.
+   * Map of encoded region names and family names to their OLDEST -- i.e. their first,
+   * the longest-lived -- sequence id in memstore. Note that this sequence id is the region
+   * sequence id.  This is not related to the id we use above for {@link #highestSyncedSequence}
+   * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
+   * ring buffer.
    */
-  private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedRegionSequenceIds =
-    new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+  private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
+    = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
+      Bytes.BYTES_COMPARATOR);
 
   /**
-   * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently
-   * being flushed out to hfiles. Entries are moved here from
-   * {@link #oldestUnflushedRegionSequenceIds} while the lock {@link #regionSequenceIdLock} is held
+   * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
+   * memstore currently being flushed out to hfiles. Entries are moved here from
+   * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
    * (so movement between the Maps is atomic). This is not related to the id we use above for
    * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
    * the disruptor ring buffer, an internal detail.
    */
-  private final Map<byte[], Long> lowestFlushingRegionSequenceIds =
-    new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+  private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
+    new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
 
  /**
   * Map of region encoded names to the latest region sequence id.  Updated on each append of
@@ -734,6 +739,28 @@ public class FSHLog implements WAL {
     return DefaultWALProvider.createWriter(conf, fs, path, false);
   }
 
+  private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
+    long result = HConstants.NO_SEQNUM;
+    for (Long seqNum: seqIdMap.values()) {
+      if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
+        result = seqNum.longValue();
+      }
+    }
+    return result;
+  }
+
+  private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
+      Map<byte[], T> mapToCopy) {
+    Map<byte[], Long> copied = Maps.newHashMap();
+    for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
+      long lowestSeqId = getLowestSeqId(entry.getValue());
+      if (lowestSeqId != HConstants.NO_SEQNUM) {
+        copied.put(entry.getKey(), lowestSeqId);
+      }
+    }
+    return copied;
+  }
+
   /**
    * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
    * have been flushed to hfiles.
@@ -746,22 +773,23 @@ public class FSHLog implements WAL {
    * @throws IOException
    */
   private void cleanOldLogs() throws IOException {
-    Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
-    Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
+    Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
+    Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
     List<Path> logsToArchive = new ArrayList<Path>();
     // make a local copy so as to avoid locking when we iterate over these maps.
     synchronized (regionSequenceIdLock) {
-      oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.lowestFlushingRegionSequenceIds);
-      oldestUnflushedSeqNumsLocal =
-        new HashMap<byte[], Long>(this.oldestUnflushedRegionSequenceIds);
+      lowestFlushingRegionSequenceIdsLocal =
+          copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
+      oldestUnflushedRegionSequenceIdsLocal =
+          copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
     }
     for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
       // iterate over the log file.
       Path log = e.getKey();
       Map<byte[], Long> sequenceNums = e.getValue();
       // iterate over the map for this log file, and tell whether it should be archive or not.
-      if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
-          oldestUnflushedSeqNumsLocal)) {
+      if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
+          oldestUnflushedRegionSequenceIdsLocal)) {
         logsToArchive.add(log);
         LOG.debug("WAL file ready for archiving " + log);
       }
@@ -815,10 +843,11 @@ public class FSHLog implements WAL {
     List<byte[]> regionsToFlush = null;
     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
     synchronized (regionSequenceIdLock) {
-      for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
-        Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey());
-        if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
-          if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
+      for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
+        long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
+        if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
+          if (regionsToFlush == null)
+            regionsToFlush = new ArrayList<byte[]>();
           regionsToFlush.add(e.getKey());
         }
       }
@@ -1584,36 +1613,53 @@ public class FSHLog implements WAL {
     // +1 for current use log
     return getNumRolledLogFiles() + 1;
   }
-  
+
   // public only until class moves to o.a.h.h.wal
   /** @return the size of log files in use */
   public long getLogFileSize() {
     return this.totalLogSize.get();
   }
-  
+
   @Override
-  public boolean startCacheFlush(final byte[] encodedRegionName) {
-    Long oldRegionSeqNum = null;
+  public boolean startCacheFlush(final byte[] encodedRegionName,
+      Set<byte[]> flushedFamilyNames) {
+    Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
     if (!closeBarrier.beginOp()) {
       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
         " - because the server is closing.");
       return false;
     }
     synchronized (regionSequenceIdLock) {
-      oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName);
-      if (oldRegionSeqNum != null) {
-        Long oldValue =
-          this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum);
-        assert oldValue ==
-          null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName);
+      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+          oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+      if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+        for (byte[] familyName: flushedFamilyNames) {
+          Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
+          if (seqId != null) {
+            oldStoreSeqNum.put(familyName, seqId);
+          }
+        }
+        if (!oldStoreSeqNum.isEmpty()) {
+          Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
+              encodedRegionName, oldStoreSeqNum);
+          assert oldValue == null: "Flushing map not cleaned up for "
+              + Bytes.toString(encodedRegionName);
+        }
+        if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
+          // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
+          // even if the region is already moved to other server.
+          // Do not worry about data racing, we held write lock of region when calling
+          // startCacheFlush, so no one can add value to the map we removed.
+          oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
+        }
       }
     }
-    if (oldRegionSeqNum == null) {
-      // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
-      //       the region is already flushing (which would make this call invalid), or there
-      //       were no appends after last flush, so why are we starting flush? Maybe we should
-      //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
-      //       alternative is telling the caller to stop. For now preserve old logic.
+    if (oldStoreSeqNum.isEmpty()) {
+      // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
+      // the region is already flushing (which would make this call invalid), or there
+      // were no appends after last flush, so why are we starting flush? Maybe we should
+      // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
+      // For now preserve old logic.
       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
         + Bytes.toString(encodedRegionName) + "]");
     }
@@ -1623,30 +1669,59 @@ public class FSHLog implements WAL {
   @Override
   public void completeCacheFlush(final byte [] encodedRegionName) {
     synchronized (regionSequenceIdLock) {
-      this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
+      this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
     }
     closeBarrier.endOp();
   }
 
+  private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
+      byte[] encodedRegionName) {
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+      return oldestUnflushedStoreSequenceIdsOfRegion;
+    }
+    oldestUnflushedStoreSequenceIdsOfRegion =
+        new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    ConcurrentMap<byte[], Long> alreadyPut =
+        oldestUnflushedStoreSequenceIds.put(encodedRegionName,
+          oldestUnflushedStoreSequenceIdsOfRegion);
+    return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
+  }
+
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+    Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
+    Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
     synchronized (regionSequenceIdLock) {
-      seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
-      if (seqNumBeforeFlushStarts != null) {
-        currentSeqNum =
-          this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts);
+      storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
+        encodedRegionName);
+      if (storeSeqNumsBeforeFlushStarts != null) {
+        ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+            getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+        for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
+            .entrySet()) {
+          currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
+            oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
+              familyNameAndSeqId.getValue()));
+        }
       }
     }
     closeBarrier.endOp();
-    if ((currentSeqNum != null)
-        && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
-      String errorStr = "Region " + Bytes.toString(encodedRegionName) +
-          "acquired edits out of order current memstore seq=" + currentSeqNum
-          + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
-      LOG.error(errorStr);
-      assert false : errorStr;
-      Runtime.getRuntime().halt(1);
+    if (storeSeqNumsBeforeFlushStarts != null) {
+      for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
+        Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
+        if (currentSeqNum != null
+            && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
+          String errorStr =
+              "Region " + Bytes.toString(encodedRegionName) + " family "
+                  + Bytes.toString(familyNameAndSeqId.getKey())
+                  + " acquired edits out of order current memstore seq=" + currentSeqNum
+                  + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
+          LOG.error(errorStr);
+          Runtime.getRuntime().halt(1);
+        }
+      }
     }
   }
 
@@ -1677,8 +1752,23 @@ public class FSHLog implements WAL {
 
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
-    Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName);
-    return result == null ? HConstants.NO_SEQNUM : result.longValue();
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    return oldestUnflushedStoreSequenceIdsOfRegion != null ?
+        getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
+  }
+
+  @Override
+  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
+      byte[] familyName) {
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+      Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
+      return result != null ? result.longValue() : HConstants.NO_SEQNUM;
+    } else {
+      return HConstants.NO_SEQNUM;
+    }
   }
 
   /**
@@ -1914,6 +2004,15 @@ public class FSHLog implements WAL {
       }
     }
 
+    private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
+        Set<byte[]> familyNameSet, Long lRegionSequenceId) {
+      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+          getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+      for (byte[] familyName : familyNameSet) {
+        oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
+      }
+    }
+
     /**
      * Append to the WAL.  Does all CP and WAL listener calls.
      * @param entry
@@ -1961,9 +2060,10 @@ public class FSHLog implements WAL {
         Long lRegionSequenceId = Long.valueOf(regionSequenceId);
         highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
         if (entry.isInMemstore()) {
-          oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
+          updateOldestUnflushedSequenceIds(encodedRegionName,
+              entry.getFamilyNames(), lRegionSequenceId);
         }
-        
+
         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index d9942b3..147a13d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -19,13 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+
+import com.google.common.collect.Sets;
 
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -96,7 +104,7 @@ class FSWALEntry extends Entry {
    */
   long stampRegionSequenceId() throws IOException {
     long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) {
+    if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
       for (Cell cell : this.memstoreCells) {
         CellUtil.setSequenceId(cell, regionSequenceId);
       }
@@ -105,4 +113,21 @@ class FSWALEntry extends Entry {
     key.setLogSeqNum(regionSequenceId);
     return regionSequenceId;
   }
+
+  /**
+   * @return the family names which are effected by this edit.
+   */
+  Set<byte[]> getFamilyNames() {
+    ArrayList<Cell> cells = this.getEdit().getCells();
+    if (CollectionUtils.isEmpty(cells)) {
+      return Collections.<byte[]>emptySet();
+    }
+    Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
+    for (Cell cell : cells) {
+      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        familySet.add(CellUtil.cloneFamily(cell));
+      }
+    }
+    return familySet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index e0fc35c..f571166 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -182,7 +183,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public boolean startCacheFlush(final byte[] encodedRegionName) {
+    public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
       return !(closed.get());
     }
 
@@ -205,6 +206,11 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
+    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+      return HConstants.NO_SEQNUM;
+    }
+
+    @Override
     public String toString() {
       return "WAL disabled.";
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 23f8c9f..5a2b08d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.wal;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -152,7 +153,7 @@ public interface WAL {
    * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
    * closing) and flush couldn't be started.
    */
-  boolean startCacheFlush(final byte[] encodedRegionName);
+  boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
 
   /**
    * Complete the cache flush.
@@ -182,6 +183,14 @@ public interface WAL {
   long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
 
   /**
+   * Gets the earliest sequence number in the memstore for this particular region and store.
+   * @param encodedRegionName The region to get the number for.
+   * @param familyName The family to get the number for.
+   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   */
+  long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);
+
+  /**
    * Human readable identifying information about the state of this WAL.
    * Implementors are encouraged to include information appropriate for debugging.
    * Consumers are advised not to rely on the details of the returned String; it does

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index a9493c7..777ecea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -268,7 +268,7 @@ public class TestIOFencing {
       compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
       LOG.info("Blocking compactions");
       compactingRegion.stopCompactions();
-      long lastFlushTime = compactingRegion.getLastFlushTime();
+      long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
       // Load some rows
       TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
 
@@ -284,7 +284,7 @@ public class TestIOFencing {
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = System.currentTimeMillis();
-      while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
+      while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
           compactingRegion.countStoreFiles() <= 1) {
         LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index ace24b1..676885b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -34,8 +34,8 @@ public class TestFlushRegionEntry {
 
   @Test
   public void test() {
-    FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class));
-    FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class));
+    FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
+    FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
 
     assertEquals(entry.hashCode(), other.hashCode());
     assertEquals(entry, other);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 91de97c..c1eeea0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -113,11 +113,11 @@ public class TestHeapMemoryManager {
     long oldBlockCacheSize = blockCache.maxSize;
     heapMemoryManager.start();
     memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
     memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
@@ -127,8 +127,8 @@ public class TestHeapMemoryManager {
     oldBlockCacheSize = blockCache.maxSize;
     // Do some more flushes before the next run of HeapMemoryTuner
     memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500);
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
@@ -408,12 +408,12 @@ public class TestHeapMemoryManager {
     }
 
     @Override
-    public void requestFlush(HRegion region) {
+    public void requestFlush(HRegion region, boolean forceFlushAllStores) {
       this.listener.flushRequested(flushType, region);
     }
 
     @Override
-    public void requestDelayedFlush(HRegion region, long delay) {
+    public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
new file mode 100644
index 0000000..43a9575
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.hash.Hashing;
+
+/**
+ * This test verifies the correctness of the Per Column Family flushing strategy
+ */
+@Category(MediumTests.class)
+public class TestPerColumnFamilyFlush {
+  private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
+
+  HRegion region = null;
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
+
+  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
+
+  public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
+      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
+
+  public static final byte[] FAMILY1 = families[0];
+
+  public static final byte[] FAMILY2 = families[1];
+
+  public static final byte[] FAMILY3 = families[2];
+
+  private void initHRegion(String callingMethod, Configuration conf) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    for (byte[] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
+    Path path = new Path(DIR, callingMethod);
+    region = HRegion.createHRegion(info, path, conf, htd);
+  }
+
+  // A helper function to create puts.
+  private Put createPut(int familyNum, int putNum) {
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    Put p = new Put(row);
+    p.add(families[familyNum - 1], qf, val);
+    return p;
+  }
+
+  // A helper function to create puts.
+  private Get createGet(int familyNum, int putNum) {
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    return new Get(row);
+  }
+
+  // A helper function to verify edits.
+  void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
+    Result r = table.get(createGet(familyNum, putNum));
+    byte[] family = families[familyNum - 1];
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
+      r.getFamilyMap(family).get(qf));
+    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
+      Arrays.equals(r.getFamilyMap(family).get(qf), val));
+  }
+
+  @Test
+  public void testSelectiveFlushWhenEnabled() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+      100 * 1024);
+    // Intialize the HRegion
+    initHRegion("testSelectiveFlushWhenEnabled", conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Get the overall smallest LSN in the region's memstores.
+    long smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // The overall smallest LSN in the region's memstores should be the same as
+    // the LSN of the smallest edit in CF1
+    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
+
+    // Some other sanity checks.
+    assertTrue(smallestSeqCF1 < smallestSeqCF2);
+    assertTrue(smallestSeqCF2 < smallestSeqCF3);
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+        + cf2MemstoreSize + cf3MemstoreSize);
+
+    // Flush!
+    region.flushcache(false);
+
+    // Will use these to check if anything changed.
+    long oldCF2MemstoreSize = cf2MemstoreSize;
+    long oldCF3MemstoreSize = cf3MemstoreSize;
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // We should have cleared out only CF1, since we chose the flush thresholds
+    // and number of puts accordingly.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    // Nothing should have happened to CF2, ...
+    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
+    // ... or CF3
+    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+    // Now the smallest LSN in the region should be the same as the smallest
+    // LSN in the memstore of CF2.
+    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
+    // Of course, this should hold too.
+    assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+        + cf3MemstoreSize);
+
+    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
+    for (int i = 1200; i < 2400; i++) {
+      region.put(createPut(2, i));
+
+      // Add only 100 puts for CF3
+      if (i - 1200 < 100) {
+        region.put(createPut(3, i));
+      }
+    }
+
+    // How much does the CF3 memstore occupy? Will be used later.
+    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Flush again
+    region.flushcache(false);
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // CF1 and CF2, both should be absent.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+    // CF3 shouldn't have been touched.
+    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+    assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
+
+    // What happens when we hit the memstore limit, but we are not able to find
+    // any Column Family above the threshold?
+    // In that case, we should flush all the CFs.
+
+    // Clearing the existing memstores.
+    region.flushcache(true);
+
+    // The memstore limit is 200*1024 and the column family flush threshold is
+    // around 50*1024. We try to just hit the memstore limit with each CF's
+    // memstore being below the CF flush threshold.
+    for (int i = 1; i <= 300; i++) {
+      region.put(createPut(1, i));
+      region.put(createPut(2, i));
+      region.put(createPut(3, i));
+      region.put(createPut(4, i));
+      region.put(createPut(5, i));
+    }
+
+    region.flushcache(false);
+    // Since we won't find any CF above the threshold, and hence no specific
+    // store to flush, we should flush all the memstores.
+    assertEquals(0, region.getMemstoreSize().get());
+  }
+
+  @Test
+  public void testSelectiveFlushWhenNotEnabled() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+
+    // Intialize the HRegion
+    initHRegion("testSelectiveFlushWhenNotEnabled", conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Some other sanity checks.
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+        + cf2MemstoreSize + cf3MemstoreSize);
+
+    // Flush!
+    region.flushcache(false);
+
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    long smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // Everything should have been cleared
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+    assertEquals(0, totalMemstoreSize);
+    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
+  }
+
+  // Find the (first) region which has the specified name.
+  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      HRegionServer hrs = rsts.get(i).getRegionServer();
+      for (HRegion region : hrs.getOnlineRegions(tableName)) {
+        return Pair.newPair(region, hrs);
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testLogReplay() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
+    // Carefully chosen limits so that the memstore just flushes when we're done
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000);
+    final int numRegionServers = 4;
+    TEST_UTIL.startMiniCluster(numRegionServers);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    HTable table = TEST_UTIL.createTable(TABLENAME, families);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte[] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
+    // These will all be interleaved in the log.
+    for (int i = 1; i <= 80; i++) {
+      table.put(createPut(1, i));
+      if (i <= 10) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+    }
+    table.flushCommits();
+    Thread.sleep(1000);
+
+    Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
+    HRegion desiredRegion = desiredRegionAndServer.getFirst();
+    assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+    // Flush the region selectively.
+    desiredRegion.flushcache(false);
+
+    long totalMemstoreSize;
+    long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
+    totalMemstoreSize = desiredRegion.getMemstoreSize().get();
+
+    // Find the sizes of the memstores of each CF.
+    cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
+
+    // CF1 Should have been flushed
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    // CF2 and CF3 shouldn't have been flushed.
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+    assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+        + cf3MemstoreSize);
+
+    // Wait for the RS report to go across to the master, so that the master
+    // is aware of which sequence ids have been flushed, before we kill the RS.
+    // If in production, the RS dies before the report goes across, we will
+    // safely replay all the edits.
+    Thread.sleep(2000);
+
+    // Abort the region server where we have the region hosted.
+    HRegionServer rs = desiredRegionAndServer.getSecond();
+    rs.abort("testing");
+
+    // The aborted region server's regions will be eventually assigned to some
+    // other region server, and the get RPC call (inside verifyEdit()) will
+    // retry for some time till the regions come back up.
+
+    // Verify that all the edits are safe.
+    for (int i = 1; i <= 80; i++) {
+      verifyEdit(1, i, table);
+      if (i <= 10) {
+        verifyEdit(2, i, table);
+        verifyEdit(3, i, table);
+      }
+    }
+
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  // Test Log Replay with Distributed Replay on.
+  // In distributed log replay, the log splitters ask the master for the
+  // last flushed sequence id for a region. This test would ensure that we
+  // are doing the book-keeping correctly.
+  @Test
+  public void testLogReplayWithDistributedReplay() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    testLogReplay();
+  }
+
+  /**
+   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
+   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
+   * test ensures that we do a full-flush in that scenario.
+   * @throws IOException
+   */
+  @Test
+  public void testFlushingWhenLogRolling() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000);
+
+    // Also, let us try real hard to get a log roll to happen.
+    // Keeping the log roll period to 2s.
+    conf.setLong("hbase.regionserver.logroll.period", 2000);
+    // Keep the block size small so that we fill up the log files very fast.
+    conf.setLong("hbase.regionserver.hlog.blocksize", 6144);
+    int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+
+    final int numRegionServers = 4;
+    TEST_UTIL.startMiniCluster(numRegionServers);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    HTable table = TEST_UTIL.createTable(TABLENAME, families);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte[] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst();
+    assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+    // Add some edits. Most will be for CF1, some for CF2 and CF3.
+    for (int i = 1; i <= 10000; i++) {
+      table.put(createPut(1, i));
+      if (i <= 200) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+      table.flushCommits();
+      // Keep adding until we exceed the number of log files, so that we are
+      // able to trigger the cleaning of old log files.
+      int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
+      if (currentNumLogFiles > maxLogs) {
+        LOG.info("The number of log files is now: " + currentNumLogFiles
+            + ". Expect a log roll and memstore flush.");
+        break;
+      }
+    }
+    table.close();
+    // Wait for some time till the flush caused by log rolling happens.
+    Thread.sleep(4000);
+
+    // We have artificially created the conditions for a log roll. When a
+    // log roll happens, we should flush all the column families. Testing that
+    // case here.
+
+    // Individual families should have been flushed.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize());
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize());
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize());
+
+    // And of course, the total memstore should also be clean.
+    assertEquals(0, desiredRegion.getMemstoreSize().get());
+
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void doPut(HTableInterface table) throws IOException {
+    // cf1 4B per row, cf2 40B per row and cf3 400B per row
+    byte[] qf = Bytes.toBytes("qf");
+    Random rand = new Random();
+    byte[] value1 = new byte[100];
+    byte[] value2 = new byte[200];
+    byte[] value3 = new byte[400];
+    for (int i = 0; i < 10000; i++) {
+      Put put = new Put(Bytes.toBytes("row-" + i));
+      rand.setSeed(i);
+      rand.nextBytes(value1);
+      rand.nextBytes(value2);
+      rand.nextBytes(value3);
+      put.add(FAMILY1, qf, value1);
+      put.add(FAMILY2, qf, value2);
+      put.add(FAMILY3, qf, value3);
+      table.put(put);
+    }
+  }
+
+  // Under the same write load, small stores should have less store files when
+  // percolumnfamilyflush enabled.
+  @Test
+  public void testCompareStoreFileCount() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+      400 * 1024);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+      ConstantSizeRegionSplitPolicy.class.getName());
+
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    htd.setCompactionEnabled(false);
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+    LOG.info("==============Test with selective flush disabled===============");
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    TEST_UTIL.getHBaseAdmin().createTable(htd);
+    getRegionWithName(TABLENAME).getFirst();
+    HConnection conn = HConnectionManager.createConnection(conf);
+    HTableInterface table = conn.getTable(TABLENAME);
+    doPut(table);
+    table.close();
+    conn.close();
+
+    HRegion region = getRegionWithName(TABLENAME).getFirst();
+    int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
+    int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
+    int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
+    TEST_UTIL.shutdownMiniCluster();
+
+    LOG.info("==============Test with selective flush enabled===============");
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    TEST_UTIL.getHBaseAdmin().createTable(htd);
+    conn = HConnectionManager.createConnection(conf);
+    table = conn.getTable(TABLENAME);
+    doPut(table);
+    table.close();
+    conn.close();
+
+    region = getRegionWithName(TABLENAME).getFirst();
+    int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
+    int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
+    int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
+    TEST_UTIL.shutdownMiniCluster();
+
+    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
+        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
+        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
+    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
+        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
+        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
+    // small CF will have less store files.
+    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
+    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int numRegions = Integer.parseInt(args[0]);
+    long numRows = Long.parseLong(args[1]);
+
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
+    htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+    Configuration conf = HBaseConfiguration.create();
+    HConnection conn = HConnectionManager.createConnection(conf);
+    HBaseAdmin admin = new HBaseAdmin(conn);
+    if (admin.tableExists(TABLENAME)) {
+      admin.disableTable(TABLENAME);
+      admin.deleteTable(TABLENAME);
+    }
+    if (numRegions >= 3) {
+      byte[] startKey = new byte[16];
+      byte[] endKey = new byte[16];
+      Arrays.fill(endKey, (byte) 0xFF);
+      admin.createTable(htd, startKey, endKey, numRegions);
+    } else {
+      admin.createTable(htd);
+    }
+    admin.close();
+
+    HTableInterface table = conn.getTable(TABLENAME);
+    byte[] qf = Bytes.toBytes("qf");
+    Random rand = new Random();
+    byte[] value1 = new byte[16];
+    byte[] value2 = new byte[256];
+    byte[] value3 = new byte[4096];
+    for (long i = 0; i < numRows; i++) {
+      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
+      rand.setSeed(i);
+      rand.nextBytes(value1);
+      rand.nextBytes(value2);
+      rand.nextBytes(value3);
+      put.add(FAMILY1, qf, value1);
+      put.add(FAMILY2, qf, value2);
+      put.add(FAMILY3, qf, value3);
+      table.put(put);
+      if (i % 10000 == 0) {
+        LOG.info(i + " rows put");
+      }
+    }
+    table.close();
+    conn.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 6182cca..970b0f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -43,8 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -158,18 +159,15 @@ public class TestFSHLog {
     }
   }
 
-  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
-                        int times, AtomicLong sequenceId) throws IOException {
-    HTableDescriptor htd = new HTableDescriptor();
-    htd.addFamily(new HColumnDescriptor("row"));
-
-    final byte [] row = Bytes.toBytes("row");
+  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
+      AtomicLong sequenceId) throws IOException {
+    final byte[] row = Bytes.toBytes("row");
     for (int i = 0; i < times; i++) {
       long timestamp = System.currentTimeMillis();
       WALEdit cols = new WALEdit();
       cols.add(new KeyValue(row, row, row, timestamp, row));
-      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
-          sequenceId, true, null);
+      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+        cols, sequenceId, true, null);
     }
     log.sync();
   }
@@ -179,8 +177,8 @@ public class TestFSHLog {
    * @param wal
    * @param regionEncodedName
    */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
-    wal.startCacheFlush(regionEncodedName);
+  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
     wal.completeCacheFlush(regionEncodedName);
   }
 
@@ -254,10 +252,14 @@ public class TestFSHLog {
     conf1.setInt("hbase.regionserver.maxlogs", 1);
     FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
         HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
-    TableName t1 = TableName.valueOf("t1");
-    TableName t2 = TableName.valueOf("t2");
-    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-    HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HTableDescriptor t1 =
+        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+    HTableDescriptor t2 =
+        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
+    HRegionInfo hri1 =
+        new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HRegionInfo hri2 =
+        new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
     // variables to mock region sequenceIds
     final AtomicLong sequenceId1 = new AtomicLong(1);
     final AtomicLong sequenceId2 = new AtomicLong(1);
@@ -284,12 +286,12 @@ public class TestFSHLog {
       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
       // remain.
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
       wal.rollWriter();
       // only one wal should remain now (that is for the second region).
       assertEquals(1, wal.getNumRolledLogFiles());
       // flush the second region
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
       wal.rollWriter(true);
       // no wal should remain now.
       assertEquals(0, wal.getNumRolledLogFiles());
@@ -306,14 +308,14 @@ public class TestFSHLog {
       regionsToFlush = wal.findRegionsToForceFlush();
       assertEquals(2, regionsToFlush.length);
       // flush both regions
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
       wal.rollWriter(true);
       assertEquals(0, wal.getNumRolledLogFiles());
       // Add an edit to region1, and roll the wal.
       addEdits(wal, hri1, t1, 2, sequenceId1);
       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
-      wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
       wal.rollWriter();
       wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
       assertEquals(1, wal.getNumRolledLogFiles());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index f3f2ebe..6cdfe3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -27,7 +27,10 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -787,13 +790,15 @@ public class TestWALReplay {
 
     // Add 1k to each family.
     final int countPerFamily = 1000;
+    Set<byte[]> familyNames = new HashSet<byte[]>();
     for (HColumnDescriptor hcd: htd.getFamilies()) {
       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
           ee, wal, htd, sequenceId);
+      familyNames.add(hcd.getName());
     }
 
     // Add a cache flush, shouldn't have any effect
-    wal.startCacheFlush(regionName);
+    wal.startCacheFlush(regionName, familyNames);
     wal.completeCacheFlush(regionName);
 
     // Add an edit to another family, should be skipped.
@@ -833,11 +838,11 @@ public class TestWALReplay {
           final HRegion region =
               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
             @Override
-            protected FlushResult internalFlushcache(
-                final WAL wal, final long myseqid, MonitoredTask status)
+            protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+                Collection<Store> storesToFlush, MonitoredTask status)
             throws IOException {
               LOG.info("InternalFlushCache Invoked");
-              FlushResult fs = super.internalFlushcache(wal, myseqid,
+              FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
                   Mockito.mock(MonitoredTask.class));
               flushcount.incrementAndGet();
               return fs;
@@ -959,16 +964,16 @@ public class TestWALReplay {
     private HRegion r;
 
     @Override
-    public void requestFlush(HRegion region) {
+    public void requestFlush(HRegion region, boolean forceFlushAllStores) {
       try {
-        r.flushcache();
+        r.flushcache(forceFlushAllStores);
       } catch (IOException e) {
         throw new RuntimeException("Exception flushing", e);
       }
     }
 
     @Override
-    public void requestDelayedFlush(HRegion region, long when) {
+    public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
       // TODO Auto-generated method stub
 
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
index e008b60..df8ceaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -147,18 +147,15 @@ public class TestDefaultWALProvider {
   }
 
 
-  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd,
                         int times, AtomicLong sequenceId) throws IOException {
-    HTableDescriptor htd = new HTableDescriptor();
-    htd.addFamily(new HColumnDescriptor("row"));
-
-    final byte [] row = Bytes.toBytes("row");
+    final byte[] row = Bytes.toBytes("row");
     for (int i = 0; i < times; i++) {
       long timestamp = System.currentTimeMillis();
       WALEdit cols = new WALEdit();
       cols.add(new KeyValue(row, row, row, timestamp, row));
-      log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
-          sequenceId, true, null);
+      log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+        cols, sequenceId, true, null);
     }
     log.sync();
   }
@@ -175,8 +172,8 @@ public class TestDefaultWALProvider {
    * @param wal
    * @param regionEncodedName
    */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
-    wal.startCacheFlush(regionEncodedName);
+  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
     wal.completeCacheFlush(regionEncodedName);
   }
 
@@ -185,45 +182,47 @@ public class TestDefaultWALProvider {
   @Test
   public void testLogCleaning() throws Exception {
     LOG.info("testLogCleaning");
-    final TableName tableName =
-        TableName.valueOf("testLogCleaning");
-    final TableName tableName2 =
-        TableName.valueOf("testLogCleaning2");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor(
+            "row"));
+    final HTableDescriptor htd2 =
+        new HTableDescriptor(TableName.valueOf("testLogCleaning2"))
+            .addFamily(new HColumnDescriptor("row"));
     final Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
     final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
     final AtomicLong sequenceId = new AtomicLong(1);
     try {
-      HRegionInfo hri = new HRegionInfo(tableName,
+      HRegionInfo hri = new HRegionInfo(htd.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-      HRegionInfo hri2 = new HRegionInfo(tableName2,
+      HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
       // we want to mix edits from regions, so pick our own identifier.
       final WAL log = wals.getWAL(UNSPECIFIED_REGION);
 
       // Add a single edit and make sure that rolling won't remove the file
       // Before HBASE-3198 it used to delete it
-      addEdits(log, hri, tableName, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
       log.rollWriter();
       assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // See if there's anything wrong with more than 1 edit
-      addEdits(log, hri, tableName, 2, sequenceId);
+      addEdits(log, hri, htd, 2, sequenceId);
       log.rollWriter();
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // Now mix edits from 2 regions, still no flushing
-      addEdits(log, hri, tableName, 1, sequenceId);
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      addEdits(log, hri, tableName, 1, sequenceId);
-      addEdits(log, hri2, tableName2, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
+      addEdits(log, hri2, htd2, 1, sequenceId);
       log.rollWriter();
       assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // Flush the first region, we expect to see the first two files getting
       // archived. We need to append something or writer won't be rolled.
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -231,8 +230,8 @@ public class TestDefaultWALProvider {
       // Flush the second region, which removes all the remaining output files
       // since the oldest was completely flushed and the two others only contain
       // flush information
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      log.startCacheFlush(hri2.getEncodedNameAsBytes());
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
       log.completeCacheFlush(hri2.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -255,21 +254,25 @@ public class TestDefaultWALProvider {
    * <p>
    * @throws IOException
    */
-  @Test 
+  @Test
   public void testWALArchiving() throws IOException {
     LOG.debug("testWALArchiving");
-    TableName table1 = TableName.valueOf("t1");
-    TableName table2 = TableName.valueOf("t2");
+    HTableDescriptor table1 =
+        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+    HTableDescriptor table2 =
+        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
     final Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
     final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
     try {
       final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
-      HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
-          HConstants.EMPTY_END_ROW);
-      HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
-          HConstants.EMPTY_END_ROW);
+      HRegionInfo hri1 =
+          new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW,
+              HConstants.EMPTY_END_ROW);
+      HRegionInfo hri2 =
+          new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW,
+              HConstants.EMPTY_END_ROW);
       // ensure that we don't split the regions.
       hri1.setSplit(false);
       hri2.setSplit(false);
@@ -288,7 +291,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // add a waledit to table1, and flush the region.
       addEdits(wal, hri1, table1, 3, sequenceId1);
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
       // roll log; all old logs should be archived.
       wal.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
@@ -302,7 +305,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // add edits for table2, and flush hri1.
       addEdits(wal, hri2, table2, 2, sequenceId2);
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
       // the log : region-sequenceId map is
       // log1: region2 (unflushed)
       // log2: region1 (flushed)
@@ -312,7 +315,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // flush region2, and all logs should be archived.
       addEdits(wal, hri2, table2, 2, sequenceId2);
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
       wal.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 47b001a..bbe4018 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -480,8 +480,9 @@ public class TestWALFactory {
   @Test
   public void testEditAdd() throws IOException {
     final int COL_COUNT = 10;
-    final TableName tableName =
-        TableName.valueOf("tablename");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+            "column"));
     final byte [] row = Bytes.toBytes("row");
     WAL.Reader reader = null;
     try {
@@ -496,16 +497,15 @@ public class TestWALFactory {
             Bytes.toBytes(Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      HRegionInfo info = new HRegionInfo(tableName,
+      HRegionInfo info = new HRegionInfo(htd.getTableName(),
         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
-      HTableDescriptor htd = new HTableDescriptor();
-      htd.addFamily(new HColumnDescriptor("column"));
       final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
 
-      final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), cols, sequenceId, true, null);
+      final long txid = log.append(htd, info,
+        new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+        cols, sequenceId, true, null);
       log.sync(txid);
-      log.startCacheFlush(info.getEncodedNameAsBytes());
+      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(info.getEncodedNameAsBytes());
       log.shutdown();
       Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -519,7 +519,7 @@ public class TestWALFactory {
         WALKey key = entry.getKey();
         WALEdit val = entry.getEdit();
         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
-        assertTrue(tableName.equals(key.getTablename()));
+        assertTrue(htd.getTableName().equals(key.getTablename()));
         Cell cell = val.getCells().get(0);
         assertTrue(Bytes.equals(row, cell.getRow()));
         assertEquals((byte)(i + '0'), cell.getValue()[0]);
@@ -538,8 +538,9 @@ public class TestWALFactory {
   @Test
   public void testAppend() throws IOException {
     final int COL_COUNT = 10;
-    final TableName tableName =
-        TableName.valueOf("tablename");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+            "column"));
     final byte [] row = Bytes.toBytes("row");
     WAL.Reader reader = null;
     final AtomicLong sequenceId = new AtomicLong(1);
@@ -553,15 +554,14 @@ public class TestWALFactory {
           Bytes.toBytes(Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      HRegionInfo hri = new HRegionInfo(tableName,
+      HRegionInfo hri = new HRegionInfo(htd.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-      HTableDescriptor htd = new HTableDescriptor();
-      htd.addFamily(new HColumnDescriptor("column"));
       final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
-      final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), cols, sequenceId, true, null);
+      final long txid = log.append(htd, hri,
+        new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+        cols, sequenceId, true, null);
       log.sync(txid);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.shutdown();
       Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -573,7 +573,7 @@ public class TestWALFactory {
       for (Cell val : entry.getEdit().getCells()) {
         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
           entry.getKey().getEncodedRegionName()));
-        assertTrue(tableName.equals(entry.getKey().getTablename()));
+        assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
         assertTrue(Bytes.equals(row, val.getRow()));
         assertEquals((byte)(idx + '0'), val.getValue()[0]);
         System.out.println(entry.getKey() + " " + val);


[2/2] hbase git commit: HBASE-10201 Port 'Make flush decisions per column family' to trunk

Posted by st...@apache.org.
HBASE-10201 Port 'Make flush decisions per column family' to trunk

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7fad665
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7fad665
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7fad665

Branch: refs/heads/master
Commit: c7fad665f34fd3c17999d5cc60b04d3faff6a7f5
Parents: a411227
Author: zhangduo <zh...@wandoujia.com>
Authored: Sat Dec 13 12:49:38 2014 +0800
Committer: stack <st...@apache.org>
Committed: Tue Dec 16 11:49:17 2014 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  24 +
 .../src/main/resources/hbase-default.xml        |  19 +-
 .../regionserver/FlushAllStoresPolicy.java      |  35 +
 .../regionserver/FlushLargeStoresPolicy.java    | 108 ++++
 .../hadoop/hbase/regionserver/FlushPolicy.java  |  49 ++
 .../hbase/regionserver/FlushPolicyFactory.java  |  76 +++
 .../hbase/regionserver/FlushRequester.java      |  15 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 314 ++++++---
 .../hbase/regionserver/HRegionServer.java       |   4 +-
 .../hadoop/hbase/regionserver/LogRoller.java    |   3 +-
 .../hbase/regionserver/MemStoreFlusher.java     |  75 ++-
 .../hbase/regionserver/RSRpcServices.java       |  10 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 242 +++++--
 .../hbase/regionserver/wal/FSWALEntry.java      |  29 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   8 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  11 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   4 +-
 .../regionserver/TestFlushRegionEntry.java      |   4 +-
 .../regionserver/TestHeapMemoryManager.java     |  16 +-
 .../regionserver/TestPerColumnFamilyFlush.java  | 644 +++++++++++++++++++
 .../hbase/regionserver/wal/TestFSHLog.java      |  42 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |  19 +-
 .../hbase/wal/TestDefaultWALProvider.java       |  73 ++-
 .../apache/hadoop/hbase/wal/TestWALFactory.java |  36 +-
 24 files changed, 1545 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 0ae0538..3f1e070 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -130,6 +130,8 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
   private static final Bytes MEMSTORE_FLUSHSIZE_KEY =
       new Bytes(Bytes.toBytes(MEMSTORE_FLUSHSIZE));
 
+  public static final String FLUSH_POLICY = "FLUSH_POLICY";
+
   /**
    * <em>INTERNAL</em> Used by rest interface to access this metadata
    * attribute which denotes if the table is a -ROOT- region or not
@@ -766,6 +768,28 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
   }
 
   /**
+   * This sets the class associated with the flush policy which determines determines the stores
+   * need to be flushed when flushing a region. The class used by default is defined in
+   * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+   * @param clazz the class name
+   */
+  public HTableDescriptor setFlushPolicyClassName(String clazz) {
+    setValue(FLUSH_POLICY, clazz);
+    return this;
+  }
+
+  /**
+   * This gets the class associated with the flush policy which determines the stores need to be
+   * flushed when flushing a region. The class used by default is defined in
+   * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+   * @return the class name of the flush policy for this table. If this returns null, the default
+   *         flush policy is used.
+   */
+  public String getFlushPolicyClassName() {
+    return getValue(FLUSH_POLICY);
+  }
+
+  /**
    * Adds a column family.
    * For the updating purpose please use {@link #modifyFamily(HColumnDescriptor)} instead.
    * @param family HColumnDescriptor of family to add.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 9d76dad..34deaec 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,7 +187,7 @@ possible configurations would overwhelm and obscure the important.
       A value of 0 means a single queue shared between all the handlers.
       A value of 1 means that each handler has its own queue.</description>
   </property>
-<property>
+  <property>
     <name>hbase.ipc.server.callqueue.read.ratio</name>
     <value>0</value>
     <description>Split the call queues into read and write queues.
@@ -329,8 +329,8 @@ possible configurations would overwhelm and obscure the important.
     <value>org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy</value>
     <description>
       A split policy determines when a region should be split. The various other split policies that
-      are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, 
-      DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.  
+      are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
+      DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
     </description>
   </property>
 
@@ -588,6 +588,19 @@ possible configurations would overwhelm and obscure the important.
     every hbase.server.thread.wakefrequency.</description>
   </property>
   <property>
+    <name>hbase.hregion.percolumnfamilyflush.size.lower.bound</name>
+    <value>16777216</value>
+    <description>
+    If FlushLargeStoresPolicy is used, then every time that we hit the
+    total memstore limit, we find out all the column families whose memstores
+    exceed this value, and only flush them, while retaining the others whose
+    memstores are lower than this limit. If none of the families have their
+    memstore size more than this, all the memstores will be flushed
+    (just as usual). This value should be less than half of the total memstore
+    threshold (hbase.hregion.memstore.flush.size).
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.preclose.flush.size</name>
     <value>5242880</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
new file mode 100644
index 0000000..0058104
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that always flushes all stores for a given region.
+ */
+@InterfaceAudience.Private
+public class FlushAllStoresPolicy extends FlushPolicy {
+
+  @Override
+  public Collection<Store> selectStoresToFlush() {
+    return region.stores.values();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
new file mode 100644
index 0000000..7e0e54c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large
+ * enough, then all stores will be flushed.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushLargeStoresPolicy extends FlushPolicy {
+
+  private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
+
+  public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
+      "hbase.hregion.percolumnfamilyflush.size.lower.bound";
+
+  private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
+
+  private long flushSizeLowerBound;
+
+  @Override
+  protected void configureForRegion(HRegion region) {
+    super.configureForRegion(region);
+    long flushSizeLowerBound;
+    String flushedSizeLowerBoundString =
+        region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+    if (flushedSizeLowerBoundString == null) {
+      flushSizeLowerBound =
+          getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+            DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+            + " is not specified, use global config(" + flushSizeLowerBound + ") instead");
+      }
+    } else {
+      try {
+        flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
+      } catch (NumberFormatException nfe) {
+        flushSizeLowerBound =
+            getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+              DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+        LOG.warn("Number format exception when parsing "
+            + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
+            + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
+            + ", use global config(" + flushSizeLowerBound + ") instead");
+
+      }
+    }
+    this.flushSizeLowerBound = flushSizeLowerBound;
+  }
+
+  private boolean shouldFlush(Store store) {
+    if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
+            + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
+            + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
+      }
+      return true;
+    }
+    return region.shouldFlushStore(store);
+  }
+
+  @Override
+  public Collection<Store> selectStoresToFlush() {
+    Collection<Store> stores = region.stores.values();
+    Set<Store> specificStoresToFlush = new HashSet<Store>();
+    for (Store store : stores) {
+      if (shouldFlush(store)) {
+        specificStoresToFlush.add(store);
+      }
+    }
+    // Didn't find any CFs which were above the threshold for selection.
+    if (specificStoresToFlush.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Since none of the CFs were above the size, flushing all.");
+      }
+      return stores;
+    } else {
+      return specificStoresToFlush;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
new file mode 100644
index 0000000..d581fee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A flush policy determines the stores that need to be flushed when flushing a region.
+ */
+@InterfaceAudience.Private
+public abstract class FlushPolicy extends Configured {
+
+  /**
+   * The region configured for this flush policy.
+   */
+  protected HRegion region;
+
+  /**
+   * Upon construction, this method will be called with the region to be governed. It will be called
+   * once and only once.
+   */
+  protected void configureForRegion(HRegion region) {
+    this.region = region;
+  }
+
+  /**
+   * @return the stores need to be flushed.
+   */
+  public abstract Collection<Store> selectStoresToFlush();
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
new file mode 100644
index 0000000..e80b696
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The class that creates a flush policy from a conf and HTableDescriptor.
+ * <p>
+ * The default flush policy is {@link FlushLargeStoresPolicy}. And for 0.98, the default flush
+ * policy is {@link FlushAllStoresPolicy}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushPolicyFactory {
+
+  private static final Log LOG = LogFactory.getLog(FlushPolicyFactory.class);
+
+  public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy";
+
+  private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS =
+      FlushLargeStoresPolicy.class;
+
+  /**
+   * Create the FlushPolicy configured for the given table.
+   */
+  public static FlushPolicy create(HRegion region, Configuration conf) throws IOException {
+    Class<? extends FlushPolicy> clazz = getFlushPolicyClass(region.getTableDesc(), conf);
+    FlushPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+    policy.configureForRegion(region);
+    return policy;
+  }
+
+  /**
+   * Get FlushPolicy class for the given table.
+   */
+  public static Class<? extends FlushPolicy> getFlushPolicyClass(HTableDescriptor htd,
+      Configuration conf) throws IOException {
+    String className = htd.getFlushPolicyClassName();
+    if (className == null) {
+      className = conf.get(HBASE_FLUSH_POLICY_KEY, DEFAULT_FLUSH_POLICY_CLASS.getName());
+    }
+    try {
+      Class<? extends FlushPolicy> clazz = Class.forName(className).asSubclass(FlushPolicy.class);
+      return clazz;
+    } catch (Exception e) {
+      LOG.warn(
+        "Unable to load configured flush policy '" + className + "' for table '"
+            + htd.getTableName() + "', load default flush policy "
+            + DEFAULT_FLUSH_POLICY_CLASS.getName() + " instead", e);
+      return DEFAULT_FLUSH_POLICY_CLASS;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index e1c3144..7517454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -30,26 +30,31 @@ public interface FlushRequester {
    * Tell the listener the cache needs to be flushed.
    *
    * @param region the HRegion requesting the cache flush
+   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+   *          rolling.
    */
-  void requestFlush(HRegion region);
+  void requestFlush(HRegion region, boolean forceFlushAllStores);
+
   /**
    * Tell the listener the cache needs to be flushed after a delay
    *
    * @param region the HRegion requesting the cache flush
    * @param delay after how much time should the flush happen
+   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+   *          rolling.
    */
-  void requestDelayedFlush(HRegion region, long delay);
+  void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
 
   /**
    * Register a FlushRequestListener
-   * 
+   *
    * @param listener
    */
   void registerFlushRequestListener(final FlushRequestListener listener);
 
   /**
    * Unregister the given FlushRequestListener
-   * 
+   *
    * @param listener
    * @return true when passed listener is unregistered successfully.
    */
@@ -57,7 +62,7 @@ public interface FlushRequester {
 
   /**
    * Sets the global memstore limit to a new size.
-   * 
+   *
    * @param globalMemStoreSize
    */
   public void setGlobalMemstoreLimit(long globalMemStoreSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 428e857..6cf2ce3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -62,7 +64,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -134,14 +136,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.Write
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -157,6 +154,11 @@ import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.util.StringUtils;
 
@@ -230,10 +232,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   final AtomicBoolean closing = new AtomicBoolean(false);
 
   /**
-   * The sequence id of the last flush on this region.  Used doing some rough calculations on
+   * The max sequence id of flushed data on this region.  Used doing some rough calculations on
    * whether time to flush or not.
    */
-  protected volatile long lastFlushSeqId = -1L;
+  protected volatile long maxFlushedSeqId = -1L;
 
   /**
    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
@@ -518,7 +520,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   long memstoreFlushSize;
   final long timestampSlop;
   final long rowProcessorTimeout;
-  private volatile long lastFlushTime;
+
+  // Last flush time for each Store. Useful when we are flushing for each column
+  private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
+      new ConcurrentHashMap<Store, Long>();
+
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
   private long flushCheckInterval;
@@ -542,6 +548,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
   private HTableDescriptor htableDescriptor = null;
   private RegionSplitPolicy splitPolicy;
+  private FlushPolicy flushPolicy;
 
   private final MetricsRegion metricsRegion;
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
@@ -619,7 +626,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
           + MAX_FLUSH_PER_CHANGES);
     }
-
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
 
@@ -785,8 +791,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     // Initialize split policy
     this.splitPolicy = RegionSplitPolicy.create(this, conf);
 
-    this.lastFlushTime = EnvironmentEdgeManager.currentTime();
-    // Use maximum of wal sequenceid or that which was found in stores
+    // Initialize flush policy
+    this.flushPolicy = FlushPolicyFactory.create(this, conf);
+
+    long lastFlushTime = EnvironmentEdgeManager.currentTime();
+    for (Store store: stores.values()) {
+      this.lastStoreFlushTimeMap.put(store, lastFlushTime);
+    }
+
+    // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
     long nextSeqid = maxSeqId;
 
@@ -1324,10 +1337,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
       }
-      if ( this.metricsRegion != null) {
+      if (this.metricsRegion != null) {
         this.metricsRegion.close();
       }
-      if ( this.metricsRegionWrapper != null) {
+      if (this.metricsRegionWrapper != null) {
         Closeables.closeQuietly(this.metricsRegionWrapper);
       }
       status.markComplete("Closed");
@@ -1473,9 +1486,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return this.fs;
   }
 
-  /** @return the last time the region was flushed */
-  public long getLastFlushTime() {
-    return this.lastFlushTime;
+  /**
+   * @return Returns the earliest time a store in the region was flushed. All
+   *         other stores in the region would have been flushed either at, or
+   *         after this time.
+   */
+  @VisibleForTesting
+  public long getEarliestFlushTimeForAllStores() {
+    return Collections.min(lastStoreFlushTimeMap.values());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1641,6 +1659,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
+   * Flush all stores.
+   * <p>
+   * See {@link #flushcache(boolean)}.
+   *
+   * @return whether the flush is success and whether the region needs compacting
+   * @throws IOException
+   */
+  public FlushResult flushcache() throws IOException {
+    return flushcache(true);
+  }
+
+  /**
    * Flush the cache.
    *
    * When this method is called the cache will be flushed unless:
@@ -1653,14 +1683,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    *
    * <p>This method may block for some time, so it should not be called from a
    * time-sensitive thread.
-   *
-   * @return true if the region needs compacting
+   * @param forceFlushAllStores whether we want to flush all stores
+   * @return whether the flush is success and whether the region needs compacting
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of wal is required
    * because a Snapshot was not properly persisted.
    */
-  public FlushResult flushcache() throws IOException {
+  public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
       String msg = "Skipping flush on " + this + " because closing";
@@ -1702,8 +1732,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
         }
       }
+
       try {
-        FlushResult fs = internalFlushcache(status);
+        Collection<Store> specificStoresToFlush =
+            forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
+        FlushResult fs = internalFlushcache(specificStoresToFlush, status);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -1726,12 +1759,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
+   * Should the store be flushed because it is old enough.
+   * <p>
+   * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
+   * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+   * returns true which will make a lot of flush requests.
+   */
+  boolean shouldFlushStore(Store store) {
+    long maxFlushedSeqId =
+        this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
+            .getFamily().getName()) - 1;
+    if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+            + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
+            + ") is far away from current(" + sequenceId.get() + "), max allowed is "
+            + flushPerChanges);
+      }
+      return true;
+    }
+    if (flushCheckInterval <= 0) {
+      return false;
+    }
+    long now = EnvironmentEdgeManager.currentTime();
+    if (store.timeOfOldestEdit() < now - flushCheckInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+            + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
+            + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Should the memstore be flushed now
    */
   boolean shouldFlush() {
     // This is a rough measure.
-    if (this.lastFlushSeqId > 0
-          && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
+    if (this.maxFlushedSeqId > 0
+          && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
       return true;
     }
     if (flushCheckInterval <= 0) { //disabled
@@ -1739,7 +1807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
     long now = EnvironmentEdgeManager.currentTime();
     //if we flushed in the recent past, we don't need to do again now
-    if ((now - getLastFlushTime() < flushCheckInterval)) {
+    if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
       return false;
     }
     //since we didn't flush in the recent past, flush now if certain conditions
@@ -1754,35 +1822,56 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
-   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
-   * memstore, all of which have also been written to the wal. We need to write those updates in the
-   * memstore out to disk, while being able to process reads/writes as much as possible during the
-   * flush operation.
-   * <p>This method may block for some time.  Every time you call it, we up the regions
-   * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
-   * than the last edit applied to this region. The returned id does not refer to an actual edit.
-   * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
-   * that was the result of this flush, etc.
-   * @return object describing the flush's state
+   * Flushing all stores.
    *
-   * @throws IOException general io exceptions
-   * @throws DroppedSnapshotException Thrown when replay of wal is required
-   * because a Snapshot was not properly persisted.
+   * @see #internalFlushcache(Collection, MonitoredTask)
    */
-  protected FlushResult internalFlushcache(MonitoredTask status)
+  private FlushResult internalFlushcache(MonitoredTask status)
       throws IOException {
-    return internalFlushcache(this.wal, -1, status);
+    return internalFlushcache(stores.values(), status);
+  }
+
+  /**
+   * Flushing given stores.
+   *
+   * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
+   */
+  private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
+      MonitoredTask status) throws IOException {
+    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
+        status);
   }
 
   /**
-   * @param wal Null if we're NOT to go via wal.
-   * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
+   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
+   * of updates in the memstore, all of which have also been written to the wal.
+   * We need to write those updates in the memstore out to disk, while being
+   * able to process reads/writes as much as possible during the flush
+   * operation.
+   * <p>
+   * This method may block for some time. Every time you call it, we up the
+   * regions sequence id even if we don't flush; i.e. the returned region id
+   * will be at least one larger than the last edit applied to this region. The
+   * returned id does not refer to an actual edit. The returned id can be used
+   * for say installing a bulk loaded file just ahead of the last hfile that was
+   * the result of this flush, etc.
+   *
+   * @param wal
+   *          Null if we're NOT to go via wal.
+   * @param myseqid
+   *          The seqid to use if <code>wal</code> is null writing out flush
+   *          file.
+   * @param storesToFlush
+   *          The list of stores to flush.
    * @return object describing the flush's state
    * @throws IOException
-   * @see #internalFlushcache(MonitoredTask)
+   *           general io exceptions
+   * @throws DroppedSnapshotException
+   *           Thrown when replay of wal is required because a Snapshot was not
+   *           properly persisted.
    */
-  protected FlushResult internalFlushcache(
-      final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
+  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+      final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -1824,63 +1913,86 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
     }
 
-    LOG.info("Started memstore flush for " + this +
-      ", current region memstore size " +
-      StringUtils.byteDesc(this.memstoreSize.get()) +
-      ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
-
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Started memstore flush for " + this + ", current region memstore size "
+          + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
+          + stores.size() + " column families' memstores are being flushed."
+          + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
+      // only log when we are not flushing all stores.
+      if (this.stores.size() > storesToFlush.size()) {
+        for (Store store: storesToFlush) {
+          LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+              + " which was occupying "
+              + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
+        }
+      }
+    }
     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
     // allow updates again so its value will represent the size of the updates received
     // during flush
     MultiVersionConsistencyControl.WriteEntry w = null;
-
     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
     // and memstore (makes it difficult to do atomic rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
-    long totalFlushableSize = 0;
     status.setStatus("Preparing to flush by snapshotting stores in " +
       getRegionInfo().getEncodedName());
+    long totalFlushableSizeOfFlushableStores = 0;
+
+    Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+    for (Store store: storesToFlush) {
+      flushedFamilyNames.add(store.getFamily().getName());
+    }
+
     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
-    long flushSeqId = -1L;
+    // The sequence id of this flush operation which is used to log FlushMarker and pass to
+    // createFlushContext to use as the store file's sequence id.
+    long flushOpSeqId = HConstants.NO_SEQNUM;
+    // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
+    // passed to HMaster.
+    long flushedSeqId = HConstants.NO_SEQNUM;
+    byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
 
     long trxId = 0;
     try {
       try {
         w = mvcc.beginMemstoreInsert();
         if (wal != null) {
-          if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+          if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
             // This should never happen.
             String msg = "Flush will not be started for ["
                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
             status.setStatus(msg);
             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
           }
-          // Get a sequence id that we can use to denote the flush. It will be one beyond the last
-          // edit that made it into the hfile (the below does not add an edit, it just asks the
-          // WAL system to return next sequence edit).
-          flushSeqId = getNextSequenceId(wal);
+          flushOpSeqId = getNextSequenceId(wal);
+          long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
+          // no oldestUnflushedSeqId means we flushed all stores.
+          // or the unflushed stores are all empty.
+          flushedSeqId =
+              oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1;
         } else {
           // use the provided sequence Id as WAL is not being used for this flush.
-          flushSeqId = myseqid;
+          flushedSeqId = flushOpSeqId = myseqid;
         }
 
-        for (Store s : stores.values()) {
-          totalFlushableSize += s.getFlushableSize();
-          storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+        for (Store s : storesToFlush) {
+          totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+          storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
         }
 
         // write the snapshot start to WAL
         if (wal != null) {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
-            getRegionInfo(), flushSeqId, committedFiles);
+            getRegionInfo(), flushOpSeqId, committedFiles);
+          // no sync. Sync is below where we do not hold the updates lock
           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
+            desc, sequenceId, false);
         }
 
         // Prepare flush (take a snapshot)
@@ -1892,7 +2004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
             try {
               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
-                getRegionInfo(), flushSeqId, committedFiles);
+                getRegionInfo(), flushOpSeqId, committedFiles);
               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                 desc, sequenceId, false);
             } catch (Throwable t) {
@@ -1909,7 +2021,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         this.updatesLock.writeLock().unlock();
       }
       String s = "Finished memstore snapshotting " + this +
-        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
       status.setStatus(s);
       if (LOG.isTraceEnabled()) LOG.trace(s);
       // sync unflushed WAL changes
@@ -1928,7 +2040,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // uncommitted transactions from being written into HFiles.
       // We have to block before we start the flush, otherwise keys that
       // were removed via a rollbackMemstore could be written to Hfiles.
-      w.setWriteNumber(flushSeqId);
+      w.setWriteNumber(flushOpSeqId);
       mvcc.waitForPreviousTransactionsComplete(w);
       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
       w = null;
@@ -1959,8 +2071,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
-      Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
-      // same order
+      Iterator<Store> it = storesToFlush.iterator();
+      // stores.values() and storeFlushCtxs have same order
       for (StoreFlushContext flush : storeFlushCtxs) {
         boolean needsCompaction = flush.commit(status);
         if (needsCompaction) {
@@ -1971,12 +2083,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
+      this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
 
       if (wal != null) {
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
-          getRegionInfo(), flushSeqId, committedFiles);
+          getRegionInfo(), flushOpSeqId, committedFiles);
         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
           desc, sequenceId, true);
       }
@@ -1990,7 +2102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       if (wal != null) {
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
-            getRegionInfo(), flushSeqId, committedFiles);
+            getRegionInfo(), flushOpSeqId, committedFiles);
           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
             desc, sequenceId, false);
         } catch (Throwable ex) {
@@ -2013,10 +2125,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
 
     // Record latest flush time
-    this.lastFlushTime = EnvironmentEdgeManager.currentTime();
+    for (Store store: storesToFlush) {
+      this.lastStoreFlushTimeMap.put(store, startTime);
+    }
 
-    // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
-    this.lastFlushSeqId = flushSeqId;
+    // Update the oldest unflushed sequence id for region.
+    this.maxFlushedSeqId = flushedSeqId;
 
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
@@ -2026,18 +2140,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     long time = EnvironmentEdgeManager.currentTime() - startTime;
     long memstoresize = this.memstoreSize.get();
-    String msg = "Finished memstore flush of ~" +
-      StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
-      ", currentsize=" +
-      StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
-      " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
-      ", compaction requested=" + compactionRequested +
-      ((wal == null)? "; wal=null": "");
+    String msg = "Finished memstore flush of ~"
+        + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
+        + totalFlushableSizeOfFlushableStores + ", currentsize="
+        + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+        + " for region " + this + " in " + time + "ms, sequenceid="
+        + flushOpSeqId +  ", compaction requested=" + compactionRequested
+        + ((wal == null) ? "; wal=null" : "");
     LOG.info(msg);
     status.setStatus(msg);
 
     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
-        FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
+        FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
   }
 
   /**
@@ -2168,7 +2282,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     if(delete.getFamilyCellMap().isEmpty()){
       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
         // Don't eat the timestamp
-        delete.deleteFamily(family, delete.getTimeStamp());
+        delete.addFamily(family, delete.getTimeStamp());
       }
     } else {
       for(byte [] family : delete.getFamilyCellMap().keySet()) {
@@ -2819,6 +2933,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
 
+
       // ------------------------------------------------------------------
       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
       // ------------------------------------------------------------------
@@ -2850,7 +2965,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       success = true;
       return addedSize;
     } finally {
-
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
         rollbackMemstore(memstoreCells);
@@ -3209,8 +3323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * We throw RegionTooBusyException if above memstore limit
    * and expect client to retry using some kind of backoff
   */
-  private void checkResources()
-    throws RegionTooBusyException {
+  private void checkResources() throws RegionTooBusyException {
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
 
@@ -3406,7 +3519,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       writestate.flushRequested = true;
     }
     // Make request outside of synchronize block; HBASE-818.
-    this.rsServices.getFlushRequester().requestFlush(this);
+    this.rsServices.getFlushRequester().requestFlush(this, false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flush requested on " + this);
     }
@@ -3527,7 +3640,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
     if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqid, status);
+      internalFlushcache(null, seqid, stores.values(), status);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     for (Path file: files) {
@@ -3681,7 +3794,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             editsCount++;
           }
           if (flush) {
-            internalFlushcache(null, currentEditSeqId, status);
+            internalFlushcache(null, currentEditSeqId, stores.values(), status);
           }
 
           if (coprocessorHost != null) {
@@ -4029,7 +4142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
       // a sequence id that we can be sure is beyond the last hfile written).
       if (assignSeqId) {
-        FlushResult fs = this.flushcache();
+        FlushResult fs = this.flushcache(true);
         if (fs.isFlushSucceeded()) {
           seqId = fs.flushSequenceId;
         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -5072,8 +5185,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     FileSystem fs = a.getRegionFileSystem().getFileSystem();
     // Make sure each region's cache is empty
-    a.flushcache();
-    b.flushcache();
+    a.flushcache(true);
+    b.flushcache(true);
 
     // Compact each region so we only have one store file per family
     a.compactStores(true);
@@ -5187,7 +5300,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     // do after lock
     if (this.metricsRegion != null) {
-      long totalSize = 0l;
+      long totalSize = 0L;
       for (Cell cell : results) {
         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
       }
@@ -5369,7 +5482,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
           }
-
           // 9. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
@@ -5497,7 +5609,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     WALEdit walEdits = null;
     List<Cell> allKVs = new ArrayList<Cell>(append.size());
     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
     long size = 0;
     long txid = 0;
 
@@ -5699,7 +5810,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
           }
-
           size = this.addAndGetGlobalMemstoreSize(size);
           flush = isFlushSize(size);
         } finally {
@@ -5996,8 +6106,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (12 * Bytes.SIZEOF_LONG) +
+      44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (11 * Bytes.SIZEOF_LONG) +
       4 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:
@@ -6568,6 +6678,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return this.maxSeqIdInStores;
   }
 
+  @VisibleForTesting
+  public long getOldestSeqIdOfStore(byte[] familyName) {
+    return wal.getEarliestMemstoreSeqNum(getRegionInfo()
+        .getEncodedNameAsBytes(), familyName);
+  }
+
   /**
    * @return if a given region is in compaction now.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9bd7abc..c20f728 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1392,7 +1392,7 @@ public class HRegionServer extends HasThread implements
       .setWriteRequestsCount(r.writeRequestsCount.get())
       .setTotalCompactingKVs(totalCompactingKVs)
       .setCurrentCompactedKVs(currentCompactedKVs)
-      .setCompleteSequenceId(r.lastFlushSeqId)
+      .setCompleteSequenceId(r.maxFlushedSeqId)
       .setDataLocality(dataLocality);
 
     return regionLoadBldr.build();
@@ -1488,7 +1488,7 @@ public class HRegionServer extends HasThread implements
             //Throttle the flushes by putting a delay. If we don't throttle, and there
             //is a balanced write-load on the regions in a table, we might end up
             //overwhelming the filesystem with too many flushes at once.
-            requester.requestDelayedFlush(r, randomDelay);
+            requester.requestDelayedFlush(r, randomDelay, false);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 6f5dfa4..aa60bfb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -166,7 +166,8 @@ class LogRoller extends HasThread {
     if (r != null) {
       requester = this.services.getFlushRequester();
       if (requester != null) {
-        requester.requestFlush(r);
+        // force flushing all stores to clean old logs
+        requester.requestFlush(r, true);
         scheduled = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 1d59701..eece27a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -39,10 +39,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Counter;
@@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
 
@@ -114,11 +114,11 @@ class MemStoreFlusher implements FlushRequester {
       90000);
     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
     this.flushHandlers = new FlushHandler[handlerCount];
-    LOG.info("globalMemStoreLimit=" +
-      StringUtils.humanReadableInt(this.globalMemStoreLimit) +
-      ", globalMemStoreLimitLowMark=" +
-      StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
-      ", maxHeap=" + StringUtils.humanReadableInt(max));
+    LOG.info("globalMemStoreLimit="
+        + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
+        + ", globalMemStoreLimitLowMark="
+        + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
+        + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
   }
 
   public Counter getUpdatesBlockedMsHighWater() {
@@ -160,13 +160,12 @@ class MemStoreFlusher implements FlushRequester {
         // lots of little flushes and cause lots of compactions, etc, which just makes
         // life worse!
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Under global heap pressure: " +
-            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
-            "store files, but is " +
-            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
-            " vs best flushable region's " +
-            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
-            ". Choosing the bigger.");
+          LOG.debug("Under global heap pressure: " + "Region "
+              + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is "
+              + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
+              + " vs best flushable region's "
+              + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1)
+              + ". Choosing the bigger.");
         }
         regionToFlush = bestAnyRegion;
       } else {
@@ -180,7 +179,7 @@ class MemStoreFlusher implements FlushRequester {
       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
 
       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
-      flushedOne = flushRegion(regionToFlush, true);
+      flushedOne = flushRegion(regionToFlush, true, true);
       if (!flushedOne) {
         LOG.info("Excluding unflushable region " + regionToFlush +
           " - trying to find a different region to flush.");
@@ -206,7 +205,7 @@ class MemStoreFlusher implements FlushRequester {
           if (fqe == null || fqe instanceof WakeupFlushThread) {
             if (isAboveLowWaterMark()) {
               LOG.debug("Flush thread woke up because memory above low water="
-                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
               if (!flushOneForGlobalPressure()) {
                 // Wasn't able to flush any region, but we're above low water mark
                 // This is unlikely to happen, but might happen when closing the
@@ -293,23 +292,23 @@ class MemStoreFlusher implements FlushRequester {
       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
   }
 
-  public void requestFlush(HRegion r) {
+  public void requestFlush(HRegion r, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue.  It'll come out near immediately.
-        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
       }
     }
   }
 
-  public void requestDelayedFlush(HRegion r, long delay) {
+  public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay
-        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
         fqe.requeue(delay);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
@@ -362,7 +361,7 @@ class MemStoreFlusher implements FlushRequester {
     }
   }
 
-  /*
+  /**
    * A flushRegion that checks store file count.  If too many, puts the flush
    * on delay queue to retry later.
    * @param fqe
@@ -406,22 +405,23 @@ class MemStoreFlusher implements FlushRequester {
         return true;
       }
     }
-    return flushRegion(region, false);
+    return flushRegion(region, false, fqe.isForceFlushAllStores());
   }
 
-  /*
+  /**
    * Flush a region.
    * @param region Region to flush.
    * @param emergencyFlush Set if we are being force flushed. If true the region
    * needs to be removed from the flush queue. If false, when we were called
    * from the main flusher run loop and we got the entry to flush by calling
    * poll on the flush queue (which removed it).
-   *
+   * @param forceFlushAllStores whether we want to flush all store.
    * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
+  private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
+      boolean forceFlushAllStores) {
     long startTime = 0;
     synchronized (this.regionsInQueue) {
       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
@@ -444,7 +444,7 @@ class MemStoreFlusher implements FlushRequester {
     lock.readLock().lock();
     try {
       notifyFlushRequest(region, emergencyFlush);
-      HRegion.FlushResult flushResult = region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
       boolean shouldCompact = flushResult.isCompactionNeeded();
       // We just want to check the size
       boolean shouldSplit = region.checkSplit() != null;
@@ -528,11 +528,12 @@ class MemStoreFlusher implements FlushRequester {
           while (isAboveHighWaterMark() && !server.isStopped()) {
             if (!blocked) {
               startTime = EnvironmentEdgeManager.currentTime();
-              LOG.info("Blocking updates on " + server.toString() +
-                ": the global memstore size " +
-                StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
-                " is >= than blocking " +
-                StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+              LOG.info("Blocking updates on "
+                  + server.toString()
+                  + ": the global memstore size "
+                  + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
+                      .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
             }
             blocked = true;
             wakeupFlushThread();
@@ -656,10 +657,13 @@ class MemStoreFlusher implements FlushRequester {
     private long whenToExpire;
     private int requeueCount = 0;
 
-    FlushRegionEntry(final HRegion r) {
+    private boolean forceFlushAllStores;
+
+    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
       this.region = r;
       this.createTime = EnvironmentEdgeManager.currentTime();
       this.whenToExpire = this.createTime;
+      this.forceFlushAllStores = forceFlushAllStores;
     }
 
     /**
@@ -679,6 +683,13 @@ class MemStoreFlusher implements FlushRequester {
     }
 
     /**
+     * @return whether we need to flush all stores.
+     */
+    public boolean isForceFlushAllStores() {
+      return forceFlushAllStores;
+    }
+
+    /**
      * @param when When to expire, when to come up out of the queue.
      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
      * to whatever you pass.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7fad665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 32d59d4..492b26d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -150,8 +149,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Counter;
@@ -159,6 +156,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.net.DNS;
 import org.apache.zookeeper.KeeperException;
@@ -694,7 +693,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
-
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
@@ -1076,7 +1074,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       LOG.info("Flushing " + region.getRegionNameAsString());
       boolean shouldFlush = true;
       if (request.hasIfOlderThanTs()) {
-        shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
+        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
       }
       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
       if (shouldFlush) {
@@ -1093,7 +1091,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         builder.setFlushed(result);
       }
-      builder.setLastFlushTime(region.getLastFlushTime());
+      builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores());
       return builder.build();
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical