You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/06/08 19:39:22 UTC

[2/2] hbase git commit: HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS

HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2baf3bfc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2baf3bfc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2baf3bfc

Branch: refs/heads/branch-1
Commit: 2baf3bfc9fbe50327dc09389efdffe8c6e71af5f
Parents: d34e9c5
Author: stack <st...@apache.org>
Authored: Mon Jun 8 10:39:08 2015 -0700
Committer: stack <st...@apache.org>
Committed: Mon Jun 8 10:39:08 2015 -0700

----------------------------------------------------------------------
 .../ZKSplitLogManagerCoordination.java          |   2 +-
 .../hadoop/hbase/master/RegionStates.java       |   7 +-
 .../hadoop/hbase/master/ServerManager.java      |  14 +
 .../regionserver/FlushLargeStoresPolicy.java    |   6 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 137 +++----
 .../hbase/regionserver/HRegionServer.java       |  22 +-
 .../hadoop/hbase/regionserver/HStore.java       |  13 +-
 .../hadoop/hbase/regionserver/Region.java       |  14 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 354 +++---------------
 .../regionserver/wal/SequenceIdAccounting.java  | 363 +++++++++++++++++++
 .../hadoop/hbase/wal/DisabledWALProvider.java   |  11 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  49 ++-
 .../apache/hadoop/hbase/wal/WALSplitter.java    |  43 ++-
 .../apache/hadoop/hbase/ipc/TestCallRunner.java |   1 -
 .../master/TestGetLastFlushedSequenceId.java    |   1 +
 .../hadoop/hbase/regionserver/TestHRegion.java  |  29 ++
 .../regionserver/TestSplitWalDataLoss.java      | 149 ++++++++
 .../hbase/regionserver/wal/TestFSHLog.java      |  52 +--
 .../wal/TestSequenceIdAccounting.java           | 132 +++++++
 19 files changed, 912 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 556a143..0ae3a00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -645,7 +645,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
             lastSequenceId = lastRecordedFlushedSequenceId;
           }
           ZKUtil.createSetData(this.watcher, nodePath,
-            ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
+          ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
           if (LOG.isDebugEnabled()) {
             LOG.debug("Marked " + regionEncodeName + " recovering from " + serverName +
               ": " + nodePath);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index 58a8260..c658475 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -453,7 +453,7 @@ public class RegionStates {
       ServerName oldServerName = regionAssignments.put(hri, serverName);
       if (!serverName.equals(oldServerName)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName + " " + hri);
+          LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
         } else {
           LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
         }
@@ -644,7 +644,7 @@ public class RegionStates {
           // Region is open on this region server, but in transition.
           // This region must be moving away from this server, or splitting/merging.
           // SSH will handle it, either skip assigning, or re-assign.
-          LOG.info("Transitioning " + state + " will be handled by SSH for " + sn);
+          LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn);
         } else if (sn.equals(state.getServerName())) {
           // Region is in transition on this region server, and this
           // region is not open on this server. So the region must be
@@ -654,7 +654,8 @@ public class RegionStates {
           // transition. The region could be in failed_close state too if we have
           // tried several times to open it while this region server is not reachable)
           if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) {
-            LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn);
+            LOG.info("Found region in " + state +
+              " to be reassigned by ServerCrashProcedure for " + sn);
             rits.add(hri);
           } else if(state.isSplittingNew()) {
             regionsToCleanIfNoMetaEntry.add(state.getRegion());

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index bdc7358..25144b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -122,9 +122,15 @@ public class ServerManager {
   // Set if we are to shutdown the cluster.
   private volatile boolean clusterShutdown = false;
 
+  /**
+   * The last flushed sequence id for a region.
+   */
   private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
     new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
 
+  /**
+   * The last flushed sequence id for a store in a region.
+   */
   private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
     storeFlushedSequenceIdsByRegion =
     new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
@@ -293,6 +299,10 @@ public class ServerManager {
       Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
       long l = entry.getValue().getCompleteSequenceId();
       // Don't let smaller sequence ids override greater sequence ids.
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
+          ", completeSequenceId=" + l);
+      }
       if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
         flushedSequenceIdByRegion.put(encodedRegionName, l);
       } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
@@ -306,6 +316,10 @@ public class ServerManager {
         byte[] family = storeSeqId.getFamilyName().toByteArray();
         existingValue = storeFlushedSequenceId.get(family);
         l = storeSeqId.getSequenceId();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
+            ", existingValue=" + existingValue + ", completeSequenceId=" + l);
+        }
         // Don't let smaller sequence ids override greater sequence ids.
         if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
           storeFlushedSequenceId.put(family, l);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index 7e0e54c..328e890 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -76,9 +76,9 @@ public class FlushLargeStoresPolicy extends FlushPolicy {
   private boolean shouldFlush(Store store) {
     if (store.getMemStoreSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
-            + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
-            + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
+        LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
+          region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
+          store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 94a193f..271a6eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -22,7 +22,6 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Constructor;
 import java.text.ParseException;
 import java.util.AbstractList;
@@ -217,13 +216,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final AtomicBoolean closing = new AtomicBoolean(false);
 
   /**
-   * The max sequence id of flushed data on this region.  Used doing some rough calculations on
-   * whether time to flush or not.
+   * The max sequence id of flushed data on this region. There is no edit in memory that is
+   * less that this sequence id.
    */
   private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
 
   /**
-   * Record the sequence id of last flush operation.
+   * Record the sequence id of last flush operation. Can be in advance of
+   * {@link #maxFlushedSeqId} when flushing a single column family. In this case,
+   * {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
    */
   private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
   /**
@@ -608,6 +609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * is new), then read them from the supplied path.
    * @param htd the table descriptor
    * @param rsServices reference to {@link RegionServerServices} or null
+   * @deprecated Use other constructors.
    */
   @Deprecated
   public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
@@ -1610,16 +1612,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
     regionLoadBldr.clearStoreCompleteSequenceId();
     for (byte[] familyName : this.stores.keySet()) {
-      long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
-      // no oldestUnflushedSeqId means no data has written to the store after last flush, so we use
-      // lastFlushOpSeqId as complete sequence id for the store.
-      regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
-          .newBuilder()
-          .setFamilyName(ByteString.copyFrom(familyName))
-          .setSequenceId(
-            oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
+      long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
+      // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will
+      // give us a sequence id that is for sure flushed. We want edit replay to start after this
+      // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id.
+      long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
+      regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
+        newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
     }
-    return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId);
+    return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1912,27 +1913,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * returns true which will make a lot of flush requests.
    */
   boolean shouldFlushStore(Store store) {
-    long maxFlushedSeqId =
-        this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
-            .getFamily().getName()) - 1;
-    if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
+    long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
+      store.getFamily().getName()) - 1;
+    if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
-            + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
-            + ") is far away from current(" + sequenceId.get() + "), max allowed is "
-            + flushPerChanges);
+        LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
+          getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
+          " is > " + this.flushPerChanges + " from current=" + sequenceId.get());
       }
       return true;
     }
-    if (flushCheckInterval <= 0) {
+    if (this.flushCheckInterval <= 0) {
       return false;
     }
     long now = EnvironmentEdgeManager.currentTime();
-    if (store.timeOfOldestEdit() < now - flushCheckInterval) {
+    if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
-            + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
-            + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
+        LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
+          getRegionInfo().getEncodedName() + " because time of oldest edit=" +
+            store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
       }
       return true;
     }
@@ -2086,18 +2085,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     if (LOG.isInfoEnabled()) {
-      LOG.info("Started memstore flush for " + this + ", current region memstore size "
-          + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
-          + stores.size() + " column families' memstores are being flushed."
-          + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
-      // only log when we are not flushing all stores.
-      if (this.stores.size() > storesToFlush.size()) {
+      // Log a fat line detailing what is being flushed.
+      StringBuilder perCfExtras = null;
+      if (!isAllFamilies(storesToFlush)) {
+        perCfExtras = new StringBuilder();
         for (Store store: storesToFlush) {
-          LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
-              + " which was occupying "
-              + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
+          perCfExtras.append("; ");
+          perCfExtras.append(store.getColumnFamilyName());
+          perCfExtras.append("=");
+          perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
         }
       }
+      LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
+        " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
+        ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
+        ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
     }
     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
@@ -2123,10 +2125,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
     // The sequence id of this flush operation which is used to log FlushMarker and pass to
-    // createFlushContext to use as the store file's sequence id.
+    // createFlushContext to use as the store file's sequence id. It can be in advance of edits
+    // still in the memstore, edits that are in other column families yet to be flushed.
     long flushOpSeqId = HConstants.NO_SEQNUM;
-    // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
-    // passed to HMaster.
+    // The max flushed sequence id after this flush operation completes. All edits in memstore
+    // will be in advance of this sequence id.
     long flushedSeqId = HConstants.NO_SEQNUM;
     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
 
@@ -2135,21 +2138,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       try {
         w = mvcc.beginMemstoreInsert();
         if (wal != null) {
-          if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
-            // This should never happen.
-            String msg = "Flush will not be started for ["
-                + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+          Long earliestUnflushedSequenceIdForTheRegion =
+            wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
+          if (earliestUnflushedSequenceIdForTheRegion == null) {
+            // This should never happen. This is how startCacheFlush signals flush cannot proceed.
+            String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
             status.setStatus(msg);
             return new PrepareFlushResult(
               new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
               myseqid);
           }
           flushOpSeqId = getNextSequenceId(wal);
-          long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
-          // no oldestUnflushedSeqId means we flushed all stores.
-          // or the unflushed stores are all empty.
-          flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
-              : oldestUnflushedSeqId - 1;
+          // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
+          flushedSeqId =
+            earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
+              flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
         } else {
           // use the provided sequence Id as WAL is not being used for this flush.
           flushedSeqId = flushOpSeqId = myseqid;
@@ -2230,6 +2233,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
+   * @param families
+   * @return True if passed Set is all families in the region.
+   */
+  private boolean isAllFamilies(final Collection<Store> families) {
+    return families == null || this.stores.size() == families.size();
+  }
+
+  /**
    * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
    * reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
    * @param wal
@@ -2344,10 +2355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.lastStoreFlushTimeMap.put(store, startTime);
     }
 
-    // Update the oldest unflushed sequence id for region.
     this.maxFlushedSeqId = flushedSeqId;
-
-    // Record flush operation sequence id.
     this.lastFlushOpSeqId = flushOpSeqId;
 
     // C. Finally notify anyone waiting on memstore to clear:
@@ -3704,7 +3712,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // Make request outside of synchronize block; HBASE-818.
     this.rsServices.getFlushRequester().requestFlush(this, false);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Flush requested on " + this);
+      LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
     }
   }
 
@@ -4438,7 +4446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             + seqId + " is greater than current seqId:" + currentSeqId);
 
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
-        if (store == null ) {
+        if (store == null) {
           for (Store s : stores.values()) {
             totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
           }
@@ -5432,11 +5440,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
           } else if (scannerContext.checkSizeLimit(limitScope)) {
             ScannerContext.NextState state =
-                moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+                moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
             return scannerContext.setScannerState(state).hasMoreValues();
           } else if (scannerContext.checkTimeLimit(limitScope)) {
             ScannerContext.NextState state =
-                moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+                moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
             return scannerContext.setScannerState(state).hasMoreValues();
           }
         } while (moreCellsInRow);
@@ -5815,7 +5823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         LOG.error(msg);
         LOG.error("unable to refresh store files", e);
         abortRegionServer(msg);
-        return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is closing");
+        return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing");
       }
     }
 
@@ -5968,12 +5976,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return new HRegion
    * @throws IOException
    */
-  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
-                                      final Configuration conf,
-                                      final HTableDescriptor hTableDescriptor,
-                                      final WAL wal,
-                                      final boolean initialize, final boolean ignoreWAL)
-      throws IOException {
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+      final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor,
+      final WAL wal, final boolean initialize, final boolean ignoreWAL)
+  throws IOException {
     LOG.info("creating HRegion " + info.getTable().getNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
         " Table name == " + info.getTable().getNameAsString());
@@ -6338,6 +6344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param tabledir qualified path for table
    * @param name ENCODED region name
    * @return Path of HRegion directory
+   * @deprecated For tests only; to be removed.
    */
   @Deprecated
   public static Path getRegionDir(final Path tabledir, final String name) {
@@ -6350,6 +6357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param rootdir qualified path of HBase root directory
    * @param info HRegionInfo for the region
    * @return qualified path of region directory
+   * @deprecated For tests only; to be removed.
    */
   @Deprecated
   @VisibleForTesting
@@ -7170,7 +7178,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                     newTags.add(itr.next());
                   }
                 }
-                if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
+                if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
                   idx++;
               }
 
@@ -7752,6 +7760,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // sync the WAL edit (SYNC and FSYNC treated the same for now)
         this.wal.sync(txid);
         break;
+      default:
+        throw new RuntimeException("Unknown durability " + durability);
       }
     }
   }
@@ -7840,8 +7850,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public long getOldestSeqIdOfStore(byte[] familyName) {
-    return wal.getEarliestMemstoreSeqNum(getRegionInfo()
-        .getEncodedNameAsBytes(), familyName);
+    return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 843c0a7..a70456a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2835,24 +2835,14 @@ public class HRegionServer extends HasThread implements
   @Override
   public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
     Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
-
     if (destination != null) {
-      try {
-        WAL wal = getWAL(r.getRegionInfo());
-        long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
-        if (closeSeqNum == HConstants.NO_SEQNUM) {
-          // No edits in WAL for this region; get the sequence number when the region was opened.
-          closeSeqNum = r.getOpenSeqNum();
-          if (closeSeqNum == HConstants.NO_SEQNUM) {
-            closeSeqNum = 0;
-          }
-        }
-        addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
-      } catch (IOException exception) {
-        LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
-            "; not adding to moved regions.");
-        LOG.debug("Exception details for failure to get wal", exception);
+      long closeSeqNum = r.getMaxFlushedSeqId();
+      if (closeSeqNum == HConstants.NO_SEQNUM) {
+        // No edits in WAL for this region; get the sequence number when the region was opened.
+        closeSeqNum = r.getOpenSeqNum();
+        if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
       }
+      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
     }
     this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
     return toReturn != null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 98dab42..0c6b2f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -111,16 +111,6 @@ import com.google.common.collect.Sets;
  * services is compaction services where files are aggregated once they pass
  * a configurable threshold.
  *
- * <p>The only thing having to do with logs that Store needs to deal with is
- * the reconstructionLog.  This is a segment of an HRegion's log that might
- * NOT be present upon startup.  If the param is NULL, there's nothing to do.
- * If the param is non-NULL, we need to process the log to reconstruct
- * a TreeMap that might not have been written to disk before the process
- * died.
- *
- * <p>It's assumed that after this constructor returns, the reconstructionLog
- * file will be deleted (by whoever has instantiated the Store).
- *
  * <p>Locking and transactions are handled at a higher level.  This API should
  * not be called directly but by an HRegion manager.
  */
@@ -898,8 +888,7 @@ public class HStore implements Store {
   }
 
   /**
-   * Write out current snapshot.  Presumes {@link #snapshot()} has been called
-   * previously.
+   * Write out current snapshot.  Presumes {@link #snapshot()} has been called previously.
    * @param logCacheFlushId flush sequence number
    * @param snapshot
    * @param status

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 8910042..da642ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -123,10 +124,19 @@ public interface Region extends ConfigurationObserver {
   /** @return the latest sequence number that was read from storage when this region was opened */
   long getOpenSeqNum();
 
-  /** @return the max sequence id of flushed data on this region */
+  /** @return the max sequence id of flushed data on this region; no edit in memory will have
+   * a sequence id that is less that what is returned here.
+   */
   long getMaxFlushedSeqId();
 
-  /** @return the oldest sequence id found in the store for the given family */
+  /** @return the oldest flushed sequence id for the given family; can be beyond
+   * {@link #getMaxFlushedSeqId()} in case where we've flushed a subset of a regions column
+   * families
+   * @deprecated Since version 1.2.0. Exposes too much about our internals; shutting it down.
+   * Do not use.
+   */
+  @VisibleForTesting
+  @Deprecated
   public long getOldestSeqIdOfStore(byte[] familyName);
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index aa722a0..b118ecd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -29,16 +29,13 @@ import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -91,7 +87,6 @@ import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
@@ -156,7 +151,7 @@ public class FSHLog implements WAL {
   private static final Log LOG = LogFactory.getLog(FSHLog.class);
 
   private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
-  
+
   /**
    * The nexus at which all incoming handlers meet.  Does appends and sync with an ordering.
    * Appends and syncs are each put on the ring which means handlers need to
@@ -168,7 +163,7 @@ public class FSHLog implements WAL {
   private final Disruptor<RingBufferTruck> disruptor;
 
   /**
-   * An executorservice that runs the disrutpor AppendEventHandler append executor.
+   * An executorservice that runs the disruptor AppendEventHandler append executor.
    */
   private final ExecutorService appendExecutor;
 
@@ -210,6 +205,7 @@ public class FSHLog implements WAL {
    * WAL directory, where all WAL files would be placed.
    */
   private final Path fullPathLogDir;
+
   /**
    * dir path where old logs are kept.
    */
@@ -241,6 +237,7 @@ public class FSHLog implements WAL {
    * conf object
    */
   protected final Configuration conf;
+
   /** Listeners that are called on WAL events. */
   private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
 
@@ -258,6 +255,7 @@ public class FSHLog implements WAL {
   public WALCoprocessorHost getCoprocessorHost() {
     return coprocessorHost;
   }
+
   /**
    * FSDataOutputStream associated with the current SequenceFile.writer
    */
@@ -289,6 +287,13 @@ public class FSHLog implements WAL {
   private volatile boolean lowReplicationRollEnabled = true;
 
   /**
+   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding
+   * sequence id as yet not flushed as well as the most recent edit sequence id appended to the
+   * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?".
+   */
+  private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
+
+  /**
    * Current log file.
    */
   volatile Writer writer;
@@ -334,52 +339,6 @@ public class FSHLog implements WAL {
 
   private final AtomicInteger closeErrorCount = new AtomicInteger();
 
-  // Region sequence id accounting across flushes and for knowing when we can GC a WAL.  These
-  // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
-  // done above in failedSequence, highest sequence, etc.
-  /**
-   * This lock ties all operations on lowestFlushingStoreSequenceIds and
-   * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
-   * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
-   * sequence id, or to find regions with old sequence ids to force flush; we are interested in
-   * old stuff not the new additions (TODO: IS THIS SAFE?  CHECK!).
-   */
-  private final Object regionSequenceIdLock = new Object();
-
-  /**
-   * Map of encoded region names and family names to their OLDEST -- i.e. their first,
-   * the longest-lived -- sequence id in memstore. Note that this sequence id is the region
-   * sequence id.  This is not related to the id we use above for {@link #highestSyncedSequence}
-   * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
-   * ring buffer.
-   */
-  private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
-    = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
-      Bytes.BYTES_COMPARATOR);
-
-  /**
-   * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
-   * memstore currently being flushed out to hfiles. Entries are moved here from
-   * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
-   * (so movement between the Maps is atomic). This is not related to the id we use above for
-   * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
-   * the disruptor ring buffer, an internal detail.
-   */
-  private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
-    new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
-
- /**
-  * Map of region encoded names to the latest region sequence id.  Updated on each append of
-  * WALEdits to the WAL. We create one map for each WAL file at the time it is rolled.
-  * <p>When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
-  * {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}.
-  * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
-  * <p>
-  * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
-  * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
-  * the same array.
-  */
-  private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
 
   /**
    * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
@@ -396,7 +355,7 @@ public class FSHLog implements WAL {
   };
 
   /**
-   * Map of wal log file to the latest sequence ids of all regions it has entries of.
+   * Map of WAL log file to the latest sequence ids of all regions it has entries of.
    * The map is sorted by the log file creation timestamp (contained in the log file name).
    */
   private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
@@ -542,7 +501,7 @@ public class FSHLog implements WAL {
       (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
 
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
-    this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
+    this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
         FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
     this.lowReplicationRollLimit =
       conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
@@ -745,128 +704,37 @@ public class FSHLog implements WAL {
     return DefaultWALProvider.createWriter(conf, fs, path, false);
   }
 
-  private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
-    long result = HConstants.NO_SEQNUM;
-    for (Long seqNum: seqIdMap.values()) {
-      if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
-        result = seqNum.longValue();
-      }
-    }
-    return result;
-  }
-
-  private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
-      Map<byte[], T> mapToCopy) {
-    Map<byte[], Long> copied = Maps.newHashMap();
-    for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
-      long lowestSeqId = getLowestSeqId(entry.getValue());
-      if (lowestSeqId != HConstants.NO_SEQNUM) {
-        copied.put(entry.getKey(), lowestSeqId);
-      }
-    }
-    return copied;
-  }
-
   /**
-   * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
-   * have been flushed to hfiles.
-   * <p>
-   * For each log file, it compares its region to sequenceId map
-   * (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in
-   * {@link FSHLog#lowestFlushingRegionSequenceIds} and
-   * {@link FSHLog#oldestUnflushedRegionSequenceIds}. If all the regions in the map are flushed
-   * past of their value, then the wal is eligible for archiving.
+   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
    * @throws IOException
    */
   private void cleanOldLogs() throws IOException {
-    Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
-    Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
-    List<Path> logsToArchive = new ArrayList<Path>();
-    // make a local copy so as to avoid locking when we iterate over these maps.
-    synchronized (regionSequenceIdLock) {
-      lowestFlushingRegionSequenceIdsLocal =
-          copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
-      oldestUnflushedRegionSequenceIdsLocal =
-          copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
-    }
-    for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
-      // iterate over the log file.
+    List<Path> logsToArchive = null;
+    // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
+    // are older than what is currently in memory, the WAL can be GC'd.
+    for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
       Path log = e.getKey();
       Map<byte[], Long> sequenceNums = e.getValue();
-      // iterate over the map for this log file, and tell whether it should be archive or not.
-      if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
-          oldestUnflushedRegionSequenceIdsLocal)) {
+      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
+        if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
         logsToArchive.add(log);
-        LOG.debug("WAL file ready for archiving " + log);
+        if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
       }
     }
-    for (Path p : logsToArchive) {
-      this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
-      archiveLogFile(p);
-      this.byWalRegionSequenceIds.remove(p);
-    }
-  }
-
-  /**
-   * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
-   * It compares the region entries present in the passed sequenceNums map with the local copy of
-   * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
-   * for all regions, the value is lesser than the minimum of values present in the
-   * oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
-   * @param sequenceNums for a WAL, at the time when it was rolled.
-   * @param oldestFlushingMap
-   * @param oldestUnflushedMap
-   * @return true if wal is eligible for archiving, false otherwise.
-   */
-   static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
-      Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
-    for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
-      // find region entries in the flushing/unflushed map. If there is no entry, it meansj
-      // a region doesn't have any unflushed entry.
-      long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
-          oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
-      long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
-          oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
-          // do a minimum to be sure to contain oldest sequence Id
-      long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
-      if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
-    }
-    return true;
-  }
-
-  /**
-   * Iterates over the given map of regions, and compares their sequence numbers with corresponding
-   * entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or
-   * equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the
-   * perspective of passed regionsSequenceNums map), because the region has already flushed the
-   * entries present in the WAL file for which this method is called for (typically, the oldest
-   * wal file).
-   * @param regionsSequenceNums
-   * @return regions which should be flushed (whose sequence numbers are larger than their
-   * corresponding un-flushed entries.
-   */
-  private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
-    List<byte[]> regionsToFlush = null;
-    // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
-    synchronized (regionSequenceIdLock) {
-      for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
-        long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
-        if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
-          if (regionsToFlush == null)
-            regionsToFlush = new ArrayList<byte[]>();
-          regionsToFlush.add(e.getKey());
-        }
+    if (logsToArchive != null) {
+      for (Path p : logsToArchive) {
+        this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
+        archiveLogFile(p);
+        this.byWalRegionSequenceIds.remove(p);
       }
     }
-    return regionsToFlush == null ? null : regionsToFlush
-        .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
   }
 
   /**
-   * If the number of un-archived WAL files is greater than maximum allowed, it checks
-   * the first (oldest) WAL file, and returns the regions which should be flushed so that it could
+   * If the number of un-archived WAL files is greater than maximum allowed, check the first
+   * (oldest) WAL file, and returns those regions which should be flushed so that it can
    * be archived.
-   * @return regions to flush in order to archive oldest wal file.
+   * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
    * @throws IOException
    */
   byte[][] findRegionsToForceFlush() throws IOException {
@@ -875,7 +743,7 @@ public class FSHLog implements WAL {
     if (logCount > this.maxLogs && logCount > 0) {
       Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
         this.byWalRegionSequenceIds.firstEntry();
-      regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
+      regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
     }
     if (regions != null) {
       StringBuilder sb = new StringBuilder();
@@ -883,9 +751,8 @@ public class FSHLog implements WAL {
         if (i > 0) sb.append(", ");
         sb.append(Bytes.toStringBinary(regions[i]));
       }
-      LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
-         this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
-         sb.toString());
+      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
+        "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
     }
     return regions;
   }
@@ -963,8 +830,7 @@ public class FSHLog implements WAL {
       this.numEntries.set(0);
       final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
       if (oldPath != null) {
-        this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
-        this.highestRegionSequenceIds = new HashMap<byte[], Long>();
+        this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
         long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
         this.totalLogSize.addAndGet(oldFileLen);
         LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
@@ -1108,7 +974,7 @@ public class FSHLog implements WAL {
       LOG.debug("Moved " + files.length + " WAL file(s) to " +
         FSUtils.getPath(this.fullPathArchiveDir));
     }
-    LOG.info("Closed WAL: " + toString() );
+    LOG.info("Closed WAL: " + toString());
   }
 
   @Override
@@ -1631,108 +1497,24 @@ public class FSHLog implements WAL {
   }
 
   @Override
-  public boolean startCacheFlush(final byte[] encodedRegionName,
-      Set<byte[]> flushedFamilyNames) {
-    Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+  public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
     if (!closeBarrier.beginOp()) {
-      LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
-        " - because the server is closing.");
-      return false;
-    }
-    synchronized (regionSequenceIdLock) {
-      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-          oldestUnflushedStoreSequenceIds.get(encodedRegionName);
-      if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
-        for (byte[] familyName: flushedFamilyNames) {
-          Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
-          if (seqId != null) {
-            oldStoreSeqNum.put(familyName, seqId);
-          }
-        }
-        if (!oldStoreSeqNum.isEmpty()) {
-          Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
-              encodedRegionName, oldStoreSeqNum);
-          assert oldValue == null: "Flushing map not cleaned up for "
-              + Bytes.toString(encodedRegionName);
-        }
-        if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
-          // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
-          // even if the region is already moved to other server.
-          // Do not worry about data racing, we held write lock of region when calling
-          // startCacheFlush, so no one can add value to the map we removed.
-          oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
-        }
-      }
-    }
-    if (oldStoreSeqNum.isEmpty()) {
-      // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
-      // the region is already flushing (which would make this call invalid), or there
-      // were no appends after last flush, so why are we starting flush? Maybe we should
-      // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
-      // For now preserve old logic.
-      LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
-        + Bytes.toString(encodedRegionName) + "]");
+      LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
+      return null;
     }
-    return true;
+    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
   }
 
   @Override
   public void completeCacheFlush(final byte [] encodedRegionName) {
-    synchronized (regionSequenceIdLock) {
-      this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
-    }
+    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
     closeBarrier.endOp();
   }
 
-  private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
-      byte[] encodedRegionName) {
-    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-        oldestUnflushedStoreSequenceIds.get(encodedRegionName);
-    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
-      return oldestUnflushedStoreSequenceIdsOfRegion;
-    }
-    oldestUnflushedStoreSequenceIdsOfRegion =
-        new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-    ConcurrentMap<byte[], Long> alreadyPut =
-        oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
-          oldestUnflushedStoreSequenceIdsOfRegion);
-    return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
-  }
-
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
-    Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-    synchronized (regionSequenceIdLock) {
-      storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
-        encodedRegionName);
-      if (storeSeqNumsBeforeFlushStarts != null) {
-        ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-            getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
-        for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
-            .entrySet()) {
-          currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
-            oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
-              familyNameAndSeqId.getValue()));
-        }
-      }
-    }
+    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
     closeBarrier.endOp();
-    if (storeSeqNumsBeforeFlushStarts != null) {
-      for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
-        Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
-        if (currentSeqNum != null
-            && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
-          String errorStr =
-              "Region " + Bytes.toString(encodedRegionName) + " family "
-                  + Bytes.toString(familyNameAndSeqId.getKey())
-                  + " acquired edits out of order current memstore seq=" + currentSeqNum
-                  + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
-          LOG.error(errorStr);
-          Runtime.getRuntime().halt(1);
-        }
-      }
-    }
   }
 
   @VisibleForTesting
@@ -1762,23 +1544,21 @@ public class FSHLog implements WAL {
 
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
-    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
-    return oldestUnflushedStoreSequenceIdsOfRegion != null ?
-        getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
+    // Used by tests. Deprecated as too subtle for general usage.
+    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
   }
 
   @Override
-  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
-      byte[] familyName) {
-    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
-    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
-      Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
-      return result != null ? result.longValue() : HConstants.NO_SEQNUM;
-    } else {
-      return HConstants.NO_SEQNUM;
-    }
+  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+    // This method is used by tests and for figuring if we should flush or not because our
+    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
+    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
+    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
+    // currently flushing sequence ids, and if anything found there, it is returning these. This is
+    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
+    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
+    // id is old even though we are currently flushing. This may mean we do too much flushing.
+    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
   }
 
   /**
@@ -1820,10 +1600,10 @@ public class FSHLog implements WAL {
     /**
      * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
      * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
-     * @throws InterruptedException
-     * @throws ExecutionException
      * @param syncFuture We need this as barometer on outstanding syncs.  If it comes home with
      * an exception, then something is up w/ our syncing.
+     * @throws InterruptedException
+     * @throws ExecutionException
      * @return The passed <code>syncFuture</code>
      * @throws FailedSyncBeforeLogCloseException 
      */
@@ -2014,15 +1794,6 @@ public class FSHLog implements WAL {
       }
     }
 
-    private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
-        Set<byte[]> familyNameSet, Long lRegionSequenceId) {
-      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
-          getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
-      for (byte[] familyName : familyNameSet) {
-        oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
-      }
-    }
-
     /**
      * Append to the WAL.  Does all CP and WAL listener calls.
      * @param entry
@@ -2040,14 +1811,14 @@ public class FSHLog implements WAL {
         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
         // so region sequenceids will also be in order.
         regionSequenceId = entry.stampRegionSequenceId();
-        
+
         // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a 
         // region sequence id only, a region edit/sequence id that is not associated with an actual 
         // edit. It has to go through all the rigmarole to be sure we have the right ordering.
         if (entry.getEdit().isEmpty()) {
           return;
         }
-        
+
         // Coprocessor hook.
         if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
             entry.getEdit())) {
@@ -2067,13 +1838,8 @@ public class FSHLog implements WAL {
         writer.append(entry);
         assert highestUnsyncedSequence < entry.getSequence();
         highestUnsyncedSequence = entry.getSequence();
-        Long lRegionSequenceId = Long.valueOf(regionSequenceId);
-        highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
-        if (entry.isInMemstore()) {
-          updateOldestUnflushedSequenceIds(encodedRegionName,
-              entry.getFamilyNames(), lRegionSequenceId);
-        }
-
+        sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
+          entry.isInMemstore());
         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
@@ -2203,4 +1969,4 @@ public class FSHLog implements WAL {
     }
     return new DatanodeInfo[0];
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
new file mode 100644
index 0000000..6e10f3c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Accounting of sequence ids per region and then by column family. So we can our accounting
+ * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance
+ * can keep abreast of the state of sequence id persistence. Also call update per append.
+ */
+class SequenceIdAccounting {
+  private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class);
+  /**
+   * This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
+   * {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
+   * lowest outstanding sequence ids EXCEPT when flushing. When we flush, the current
+   * lowest set for the region/column family are moved (atomically because of this lock) to
+   * {@link #flushingSequenceIds}.
+   * 
+   * <p>The two Maps are tied by this locking object EXCEPT when we go to update the lowest
+   * entry; see {@link #lowest(byte[], Set, Long)}. In here is a putIfAbsent call on
+   * {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest
+   * sequence id if we find that there is no entry for the current column family. There will be no
+   * entry only if we just came up OR we have moved aside current set of lowest sequence ids
+   * because the current set are being flushed (by putting them into {@link #flushingSequenceIds}).
+   * This is how we pick up the next 'lowest' sequence id per region per column family to be used
+   * figuring what is in the next flush.
+   */
+  private final Object tieLock = new Object();
+
+  /**
+   * Map of encoded region names and family names to their OLDEST -- i.e. their first,
+   * the longest-lived, their 'earliest', the 'lowest' -- sequence id.
+   *
+   * <p>When we flush, the current lowest sequence ids get cleared and added to
+   * {@link #flushingSequenceIds}. The next append that comes in, is then added
+   * here to {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid.
+   *
+   * <p>If flush fails, currently server is aborted so no need to restore previous sequence ids.
+   * <p>Needs to be concurrent Maps because we use putIfAbsent updating oldest.
+   */
+  private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> lowestUnflushedSequenceIds
+    = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
+      Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id
+   * currently being flushed out to hfiles. Entries are moved here from
+   * {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held
+   * (so movement between the Maps is atomic).
+   */
+  private final Map<byte[], Map<byte[], Long>> flushingSequenceIds =
+    new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
+
+ /**
+  * Map of region encoded names to the latest/highest region sequence id.  Updated on each
+  * call to append.
+  * <p>
+  * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
+  * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
+  * the same array.
+  */
+  private Map<byte[], Long> highestSequenceIds = new HashMap<byte[], Long>();
+
+  /**
+   * Returns the lowest unflushed sequence id for the region.
+   * @param encodedRegionName
+   * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
+   * return {@link HConstants#NO_SEQNUM} when none.
+   */
+  long getLowestSequenceId(final byte [] encodedRegionName) {
+    synchronized (this.tieLock)  {
+      Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
+      long flushingLowest = m != null? getLowestSequenceId(m): Long.MAX_VALUE;
+      m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
+      long unflushedLowest = m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM;
+      return Math.min(flushingLowest, unflushedLowest);
+    }
+  }
+
+  /**
+   * @param encodedRegionName
+   * @param familyName 
+   * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
+   * <code>familyName</code>. Returned sequenceid may be for an edit currently being flushed.
+   */
+  long getLowestSequenceId(final byte [] encodedRegionName, final byte [] familyName) {
+    synchronized (this.tieLock) {
+      Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
+      if (m != null) {
+        Long lowest = m.get(familyName);
+        if (lowest != null) return lowest;
+      }
+      m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
+      if (m != null) {
+        Long lowest = m.get(familyName);
+        if (lowest != null) return lowest;
+      }
+    }
+    return HConstants.NO_SEQNUM;
+  }
+
+  /**
+   * Reset the accounting of highest sequenceid by regionname.
+   * @return Return the previous accounting Map of regions to the last sequence id written into
+   * each.
+   */
+  Map<byte[], Long> resetHighest() {
+    Map<byte[], Long> old = this.highestSequenceIds;
+    this.highestSequenceIds = new HashMap<byte[], Long>();
+    return old;
+  }
+
+  /**
+   * We've been passed a new sequenceid for the region. Set it as highest seen for this region and
+   * if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing
+   * currently older.
+   * @param encodedRegionName
+   * @param families
+   * @param sequenceid
+   * @param lowest Whether to keep running account of oldest sequence id.
+   */
+  void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
+      final boolean lowest) {
+    Long l = Long.valueOf(sequenceid);
+    this.highestSequenceIds.put(encodedRegionName, l);
+    if (lowest) {
+      ConcurrentMap<byte[], Long> m = getOrCreateLowestSequenceIds(encodedRegionName);
+      for (byte[] familyName : families) {
+        m.putIfAbsent(familyName, l);
+      }
+    }
+  }
+
+  ConcurrentMap<byte[], Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
+    // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.
+    ConcurrentMap<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
+    if (m != null) return m;
+    m = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    // Another thread may have added it ahead of us.
+    ConcurrentMap<byte[], Long> alreadyPut =
+        this.lowestUnflushedSequenceIds.putIfAbsent(encodedRegionName, m);
+    return alreadyPut == null? m : alreadyPut;
+  }
+
+  /**
+   * @param sequenceids Map to search for lowest value.
+   * @return Lowest value found in <code>sequenceids</code>.
+   */
+  static long getLowestSequenceId(Map<byte[], Long> sequenceids) {
+    long lowest = HConstants.NO_SEQNUM;
+    for (Long sid: sequenceids.values()) {
+      if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {
+        lowest = sid.longValue();
+      }
+    }
+    return lowest;
+  }
+
+  /**
+   * @param src
+   * @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it
+   * instead has found the smallest sequence id and it returns that as the value instead.
+   */
+  private <T extends Map<byte[], Long>> Map<byte[], Long> flattenToLowestSequenceId(
+      Map<byte[], T> src) {
+    if (src == null || src.isEmpty()) return null;
+    Map<byte[], Long> tgt = Maps.newHashMap();
+    for (Map.Entry<byte[], T> entry: src.entrySet()) {
+      long lowestSeqId = getLowestSequenceId(entry.getValue());
+      if (lowestSeqId != HConstants.NO_SEQNUM) {
+        tgt.put(entry.getKey(), lowestSeqId);
+      }
+    }
+    return tgt;
+  }
+
+  /**
+   * @param encodedRegionName Region to flush.
+   * @param families Families to flush. May be a subset of all families in the region.
+   * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
+   * we are flushing a subset of all families but there are no edits in those families not
+   * being flushed; in other words, this is effectively same as a flush of all of the region
+   * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
+   * oldest/lowest outstanding edit.
+   */
+  Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
+    Map<byte[], Long> oldSequenceIds = null;
+    Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
+    synchronized (tieLock) {
+      Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
+      if (m != null) {
+        // NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled
+        // circumstance because another concurrent thread now may add sequenceids for this family
+        // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
+        // is fine because updates are blocked when this method is called. Make sure!!!
+        for (byte[] familyName: families) {
+          Long seqId = m.remove(familyName);
+          if (seqId != null) {
+            if (oldSequenceIds == null) oldSequenceIds = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+            oldSequenceIds.put(familyName, seqId);
+          }
+        }
+        if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {
+          if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {
+            LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) +
+              ", sequenceid=" + oldSequenceIds);
+          }
+        }
+        if (m.isEmpty()) {
+          // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
+          // even if the region is already moved to other server.
+          // Do not worry about data racing, we held write lock of region when calling
+          // startCacheFlush, so no one can add value to the map we removed.
+          this.lowestUnflushedSequenceIds.remove(encodedRegionName);
+        } else {
+          // Flushing a subset of the region families. Return the sequence id of the oldest entry.
+          lowestUnflushedInRegion = Collections.min(m.values());
+        }
+      }
+    }
+    // Do this check outside lock.
+    if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {
+      // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
+      // the region is already flushing (which would make this call invalid), or there
+      // were no appends after last flush, so why are we starting flush? Maybe we should
+      // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
+      // For now preserve old logic.
+      LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));
+    }
+    return lowestUnflushedInRegion;
+  }
+
+  void completeCacheFlush(final byte [] encodedRegionName) {
+    synchronized (tieLock) {
+      this.flushingSequenceIds.remove(encodedRegionName);
+    }
+  }
+
+  void abortCacheFlush(final byte[] encodedRegionName) {
+    // Method is called when we are crashing down because failed write flush AND it is called
+    // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids.
+    Map<byte[], Long> flushing = null;
+    Map<byte[], Long> tmpMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what
+    // happened in startCacheFlush. During prepare phase, we have update lock on the region so
+    // no edits should be coming in via append.
+    synchronized (tieLock) {
+      flushing = this.flushingSequenceIds.remove(encodedRegionName);
+      if (flushing != null) {
+        Map<byte[], Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName);
+        for (Map.Entry<byte[], Long> e: flushing.entrySet()) {
+          // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this
+          // value, it will now be in tmpMap.
+          tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue()));
+        }
+      }
+    }
+
+    // Here we are doing some 'test' to see if edits are going in out of order. What is it for?
+    // Carried over from old code.
+    if (flushing != null) {
+      for (Map.Entry<byte[], Long> e : flushing.entrySet()) {
+        Long currentId = tmpMap.get(e.getKey());
+        if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
+          String errorStr = Bytes.toString(encodedRegionName) + " family " +
+            Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" +
+              currentId + ", previous oldest unflushed id=" + e.getValue();
+          LOG.error(errorStr);
+          Runtime.getRuntime().halt(1);
+        }
+      }
+    }
+  }
+
+  /**
+   * See if passed <code>sequenceids</code> are lower -- i.e. earlier -- than any outstanding
+   * sequenceids, sequenceids we are holding on to in this accounting instance.
+   * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make
+   * sense for it to be null).
+   * @return true if all sequenceids are lower, older than, the old sequenceids in this instance.
+   */
+   boolean areAllLower(Map<byte[], Long> sequenceids) {
+     Map<byte[], Long> flushing = null;
+     Map<byte[], Long> unflushed = null;
+     synchronized (this.tieLock) {
+       // Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed
+       // data structures to use in tests below.
+       flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
+       unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
+     }
+    for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
+      long oldestFlushing = Long.MAX_VALUE;
+      long oldestUnflushed = Long.MAX_VALUE;
+      if (flushing != null) {
+        if (flushing.containsKey(e.getKey())) oldestFlushing = flushing.get(e.getKey());
+      }
+      if (unflushed != null) {
+        if (unflushed.containsKey(e.getKey())) oldestUnflushed = unflushed.get(e.getKey());
+      }
+      long min = Math.min(oldestFlushing, oldestUnflushed);
+      if (min <= e.getValue()) return false;
+    }
+    return true;
+  }
+
+   /**
+    * Iterates over the given Map and compares sequence ids with corresponding
+    * entries in {@link #oldestUnflushedRegionSequenceIds}. If a region in
+    * {@link #oldestUnflushedRegionSequenceIds} has a sequence id less than that passed
+    * in <code>sequenceids</code> then return it.
+    * @param sequenceids Sequenceids keyed by encoded region name.
+    * @return regions found in this instance with sequence ids less than those passed in.
+    */
+   byte[][] findLower(Map<byte[], Long> sequenceids) {
+     List<byte[]> toFlush = null;
+     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
+     synchronized (tieLock) {
+       for (Map.Entry<byte[], Long> e: sequenceids.entrySet()) {
+         Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());
+         if (m == null) continue;
+         // The lowest sequence id outstanding for this region.
+         long lowest = getLowestSequenceId(m);
+         if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
+           if (toFlush == null) toFlush = new ArrayList<byte[]>();
+           toFlush.add(e.getKey());
+         }
+       }
+     }
+     return toFlush == null? null: toFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 7254ad1..56d17a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.util.FSUtils;
-
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
  * No-op implementation of {@link WALProvider} used when the WAL is disabled.
@@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
-      return !(closed.get());
+    public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
+      if (closed.get()) return null;
+      return HConstants.NO_SEQNUM;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5a2b08d..4844487 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 
@@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
  * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@@ -140,31 +143,36 @@ public interface WAL {
   void sync(long txid) throws IOException;
 
   /**
-   * WAL keeps track of the sequence numbers that were not yet flushed from memstores
-   * in order to be able to do cleanup. This method tells WAL that some region is about
-   * to flush memstore.
-   *
-   * <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
-   * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit,
-   * AtomicLong, boolean, List)} as new oldest seqnum.
-   * In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
-   * the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
+   * WAL keeps track of the sequence numbers that are as yet not flushed im memstores
+   * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
+   * that some region is about to flush. The flush can be the whole region or for a column family
+   * of the region only.
    *
-   * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
-   * closing) and flush couldn't be started.
+   * <p>Currently, it is expected that the update lock is held for the region; i.e. no
+   * concurrent appends while we set up cache flush.
+   * @param families Families to flush. May be a subset of all families in the region.
+   * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
+   * we are flushing a subset of all families but there are no edits in those families not
+   * being flushed; in other words, this is effectively same as a flush of all of the region
+   * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
+   * oldest/lowest outstanding edit.
+   * @see #completeCacheFlush(byte[])
+   * @see #abortCacheFlush(byte[])
    */
-  boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
+  Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
 
   /**
    * Complete the cache flush.
    * @param encodedRegionName Encoded region name.
+   * @see #startCacheFlush(byte[], Set)
+   * @see #abortCacheFlush(byte[])
    */
   void completeCacheFlush(final byte[] encodedRegionName);
 
   /**
    * Abort a cache flush. Call if the flush fails. Note that the only recovery
    * for an aborted flush currently is a restart of the regionserver so the
-   * snapshot content dropped by the failure gets restored to the memstore.v
+   * snapshot content dropped by the failure gets restored to the memstore.
    * @param encodedRegionName Encoded region name.
    */
   void abortCacheFlush(byte[] encodedRegionName);
@@ -174,19 +182,22 @@ public interface WAL {
    */
   WALCoprocessorHost getCoprocessorHost();
 
-
-  /** Gets the earliest sequence number in the memstore for this particular region.
-   * This can serve as best-effort "recent" WAL number for this region.
+  /**
+   * Gets the earliest unflushed sequence id in the memstore for the region.
    * @param encodedRegionName The region to get the number for.
-   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
+   * @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal
+   * workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])}
    */
+  @VisibleForTesting
+  @Deprecated
   long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
 
   /**
-   * Gets the earliest sequence number in the memstore for this particular region and store.
+   * Gets the earliest unflushed sequence id in the memstore for the store.
    * @param encodedRegionName The region to get the number for.
    * @param familyName The family to get the number for.
-   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
    */
   long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);