You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/09 00:14:10 UTC

[30/32] hbase git commit: HBASE-15158 Change order in which we do write pipeline operations; do all under row locks

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec92a8a7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f03c205..ac846b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,6 +41,7 @@ import java.util.NavigableSet;
 import java.util.RandomAccess;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -69,7 +70,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellScanner;
@@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagRewriteCell;
-import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
@@ -112,7 +110,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
@@ -123,8 +121,6 @@ import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
@@ -148,6 +144,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -169,7 +166,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.EncryptionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -199,6 +195,7 @@ import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 import com.google.protobuf.TextFormat;
 
+@SuppressWarnings("deprecation")
 @InterfaceAudience.Private
 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
   private static final Log LOG = LogFactory.getLog(HRegion.class);
@@ -207,18 +204,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     "hbase.hregion.scan.loadColumnFamiliesOnDemand";
 
   /**
-   * Longest time we'll wait on a sequenceid.
-   * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use
-   * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout
-   * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally
-   * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting
-   * on the WALKey latch. Revisit.
-   */
-  private final int maxWaitForSeqId;
-  private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
-  private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
-
-  /**
    * This is the global default value for durability. All tables/mutations not
    * defining a durability or using USE_DEFAULT will default to this value.
    */
@@ -282,7 +267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final Counter checkAndMutateChecksPassed = new Counter();
   final Counter checkAndMutateChecksFailed = new Counter();
 
-  //Number of requests
+  // Number of requests
   final Counter readRequestsCount = new Counter();
   final Counter filteredReadRequestsCount = new Counter();
   final Counter writeRequestsCount = new Counter();
@@ -357,7 +342,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   private boolean disallowWritesInRecovering = false;
 
-  // when a region is in recovering state, it can only accept writes not reads
+  // When a region is in recovering state, it can only accept writes not reads
   private volatile boolean recovering = false;
 
   private volatile Optional<ConfigurationManager> configurationManager;
@@ -374,7 +359,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // We achieve this by synchronizing on the scannerReadPoints object.
     synchronized(scannerReadPoints) {
       minimumReadPoint = mvcc.getReadPoint();
-
       for (Long readPoint: this.scannerReadPoints.values()) {
         if (readPoint < minimumReadPoint) {
           minimumReadPoint = readPoint;
@@ -674,7 +658,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
 
-    this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
     this.htableDescriptor = htd;
     this.rsServices = rsServices;
@@ -1183,7 +1166,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public void setRecovering(boolean newState) {
     boolean wasRecovering = this.recovering;
-    // before we flip the recovering switch (enabling reads) we should write the region open
+    // Before we flip the recovering switch (enabling reads) we should write the region open
     // event to WAL if needed
     if (wal != null && getRegionServerServices() != null && !writestate.readOnly
         && wasRecovering && !newState) {
@@ -2051,7 +2034,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Should the store be flushed because it is old enough.
    * <p>
-   * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
+   * Every FlushPolicy should call this to determine whether a store is old enough to flush (except
    * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
    * returns true which will make a lot of flush requests.
    */
@@ -2152,19 +2135,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * for say installing a bulk loaded file just ahead of the last hfile that was
    * the result of this flush, etc.
    *
-   * @param wal
-   *          Null if we're NOT to go via wal.
-   * @param myseqid
-   *          The seqid to use if <code>wal</code> is null writing out flush
-   *          file.
-   * @param storesToFlush
-   *          The list of stores to flush.
+   * @param wal Null if we're NOT to go via wal.
+   * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
+   * @param storesToFlush The list of stores to flush.
    * @return object describing the flush's state
-   * @throws IOException
-   *           general io exceptions
-   * @throws DroppedSnapshotException
-   *           Thrown when replay of wal is required because a Snapshot was not
-   *           properly persisted.
+   * @throws IOException general io exceptions
+   * @throws DroppedSnapshotException Thrown when replay of WAL is required.
    */
   protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
       final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
@@ -2188,65 +2164,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       throw new IOException("Aborting flush because server is aborted...");
     }
     final long startTime = EnvironmentEdgeManager.currentTime();
-    // If nothing to flush, return, but we need to safely update the region sequence id
+    // If nothing to flush, return, but return with a valid unused sequenceId.
+    // Its needed by bulk upload IIRC. It flushes until no edits in memory so it can insert a
+    // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
+    // to no other that it can use to associate with the bulk load. Hence this little dance below
+    // to go get one.
     if (this.memstoreSize.get() <= 0) {
-      // Take an update lock because am about to change the sequence id and we want the sequence id
-      // to be at the border of the empty memstore.
-      MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+      // Take an update lock so no edits can come into memory just yet.
       this.updatesLock.writeLock().lock();
+      WriteEntry writeEntry = null;
       try {
         if (this.memstoreSize.get() <= 0) {
           // Presume that if there are still no edits in the memstore, then there are no edits for
           // this region out in the WAL subsystem so no need to do any trickery clearing out
-          // edits in the WAL system. Up the sequence number so the resulting flush id is for
-          // sure just beyond the last appended region edit (useful as a marker when bulk loading,
-          // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing
-          // here.
+          // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
+          // sure just beyond the last appended region edit and not associated with any edit
+          // (useful as marker when bulk loading, etc.).
+          FlushResult flushResult = null;
           if (wal != null) {
             writeEntry = mvcc.begin();
             long flushOpSeqId = writeEntry.getWriteNumber();
-            FlushResult flushResult = new FlushResultImpl(
-                FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-                flushOpSeqId,
-                "Nothing to flush",
-                writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
-            // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader
-            // with a read point is in advance of this write point.
+            flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+              flushOpSeqId, "Nothing to flush",
+            writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
             mvcc.completeAndWait(writeEntry);
+            // Set to null so we don't complete it again down in finally block.
             writeEntry = null;
             return new PrepareFlushResult(flushResult, myseqid);
           } else {
-            return new PrepareFlushResult(
-              new FlushResultImpl(
-                  FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-                  "Nothing to flush",
-                  false),
-              myseqid);
+            return new PrepareFlushResult(new FlushResultImpl(
+              FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", false), myseqid);
           }
         }
       } finally {
-        this.updatesLock.writeLock().unlock();
         if (writeEntry != null) {
+          // If writeEntry is non-null, this operation failed; the mvcc transaction failed...
+          // but complete it anyways so it doesn't block the mvcc queue.
           mvcc.complete(writeEntry);
         }
+        this.updatesLock.writeLock().unlock();
       }
     }
-
-    if (LOG.isInfoEnabled()) {
-      // Log a fat line detailing what is being flushed.
-      StringBuilder perCfExtras = null;
-      if (!isAllFamilies(storesToFlush)) {
-        perCfExtras = new StringBuilder();
-        for (Store store: storesToFlush) {
-          perCfExtras.append("; ").append(store.getColumnFamilyName());
-          perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
-        }
-      }
-      LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
-        " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
-        ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
-        ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
-    }
+    logFatLineOnFlush(storesToFlush, myseqid);
     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
     // allow updates again so its value will represent the size of the updates received
@@ -2257,8 +2216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     status.setStatus("Obtaining lock to block concurrent updates");
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
-    status.setStatus("Preparing to flush by snapshotting stores in " +
-      getRegionInfo().getEncodedName());
+    status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
     long totalFlushableSizeOfFlushableStores = 0;
 
     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
@@ -2280,109 +2238,117 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // will be in advance of this sequence id.
     long flushedSeqId = HConstants.NO_SEQNUM;
     byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
-
-    long trxId = 0;
-    MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
     try {
-      try {
-        if (wal != null) {
-          Long earliestUnflushedSequenceIdForTheRegion =
+      if (wal != null) {
+        Long earliestUnflushedSequenceIdForTheRegion =
             wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
-          if (earliestUnflushedSequenceIdForTheRegion == null) {
-            // This should never happen. This is how startCacheFlush signals flush cannot proceed.
-            String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
-            status.setStatus(msg);
-            return new PrepareFlushResult(
+        if (earliestUnflushedSequenceIdForTheRegion == null) {
+          // This should never happen. This is how startCacheFlush signals flush cannot proceed.
+          String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
+          status.setStatus(msg);
+          return new PrepareFlushResult(
               new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
               myseqid);
-          }
-          flushOpSeqId = getNextSequenceId(wal);
-          // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
-          flushedSeqId =
-            earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
-              flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
-        } else {
-          // use the provided sequence Id as WAL is not being used for this flush.
-          flushedSeqId = flushOpSeqId = myseqid;
         }
+        flushOpSeqId = getNextSequenceId(wal);
+        // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
+        flushedSeqId =
+            earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
+                flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
+      } else {
+        // use the provided sequence Id as WAL is not being used for this flush.
+        flushedSeqId = flushOpSeqId = myseqid;
+      }
 
-        for (Store s : storesToFlush) {
-          totalFlushableSizeOfFlushableStores += s.getFlushableSize();
-          storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
-          committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
-          storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
-        }
+      for (Store s : storesToFlush) {
+        totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+        storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
+        committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
+        storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
+      }
 
-        // write the snapshot start to WAL
-        if (wal != null && !writestate.readOnly) {
-          FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
+      // write the snapshot start to WAL
+      if (wal != null && !writestate.readOnly) {
+        FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          // no sync. Sync is below where we do not hold the updates lock
-          trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, false, mvcc);
-        }
-
-        // Prepare flush (take a snapshot)
-        for (StoreFlushContext flush : storeFlushCtxs.values()) {
-          flush.prepare();
-        }
-      } catch (IOException ex) {
-        if (wal != null) {
-          if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
-            try {
-              FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
-                getRegionInfo(), flushOpSeqId, committedFiles);
-              WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-                desc, false, mvcc);
-            } catch (Throwable t) {
-              LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
-                  StringUtils.stringifyException(t));
-              // ignore this since we will be aborting the RS with DSE.
-            }
-          }
-          // we have called wal.startCacheFlush(), now we have to abort it
-          wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
-          throw ex; // let upper layers deal with it.
-        }
-      } finally {
-        this.updatesLock.writeLock().unlock();
-      }
-      String s = "Finished memstore snapshotting " + this +
-        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
-      status.setStatus(s);
-      if (LOG.isTraceEnabled()) LOG.trace(s);
-      // sync unflushed WAL changes
-      // see HBASE-8208 for details
-      if (wal != null) {
-        try {
-          wal.sync(); // ensure that flush marker is sync'ed
-        } catch (IOException ioe) {
-          wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
-          throw ioe;
-        }
+        // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
+        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
       }
 
-      // wait for all in-progress transactions to commit to WAL before
-      // we can start the flush. This prevents
-      // uncommitted transactions from being written into HFiles.
-      // We have to block before we start the flush, otherwise keys that
-      // were removed via a rollbackMemstore could be written to Hfiles.
-      mvcc.completeAndWait(writeEntry);
-      // set writeEntry to null to prevent mvcc.complete from being called again inside finally
-      // block
-      writeEntry = null;
-    } finally {
-      if (writeEntry != null) {
-        // In case of failure just mark current writeEntry as complete.
-        mvcc.complete(writeEntry);
+      // Prepare flush (take a snapshot)
+      for (StoreFlushContext flush : storeFlushCtxs.values()) {
+        flush.prepare();
       }
+    } catch (IOException ex) {
+      doAbortFlushToWAL(wal, flushOpSeqId, committedFiles);
+      throw ex;
+    } finally {
+      this.updatesLock.writeLock().unlock();
     }
+    String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " +
+        "flushsize=" + totalFlushableSizeOfFlushableStores;
+    status.setStatus(s);
+    doSyncOfUnflushedWALChanges(wal, getRegionInfo());
     return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
         flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
   }
 
   /**
-   * @param families
+   * Utility method broken out of internalPrepareFlushCache so that method is smaller.
+   */
+  private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) {
+    if (!LOG.isInfoEnabled()) {
+      return;
+    }
+    // Log a fat line detailing what is being flushed.
+    StringBuilder perCfExtras = null;
+    if (!isAllFamilies(storesToFlush)) {
+      perCfExtras = new StringBuilder();
+      for (Store store: storesToFlush) {
+        perCfExtras.append("; ").append(store.getColumnFamilyName());
+        perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
+      }
+    }
+    LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
+        " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
+        ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
+        ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
+  }
+
+  private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId,
+      final Map<byte[], List<Path>> committedFiles) {
+    if (wal == null) return;
+    try {
+      FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
+          getRegionInfo(), flushOpSeqId, committedFiles);
+      WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
+          mvcc);
+    } catch (Throwable t) {
+      LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
+          StringUtils.stringifyException(t));
+      // ignore this since we will be aborting the RS with DSE.
+    }
+    // we have called wal.startCacheFlush(), now we have to abort it
+    wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
+  }
+
+  /**
+   * Sync unflushed WAL changes. See HBASE-8208 for details
+   */
+  private static void doSyncOfUnflushedWALChanges(final WAL wal, final HRegionInfo hri)
+  throws IOException {
+    if (wal == null) {
+      return;
+    }
+    try {
+      wal.sync(); // ensure that flush marker is sync'ed
+    } catch (IOException ioe) {
+      wal.abortCacheFlush(hri.getEncodedNameAsBytes());
+      throw ioe;
+    }
+  }
+
+  /**
    * @return True if passed Set is all families in the region.
    */
   private boolean isAllFamilies(final Collection<Store> families) {
@@ -2400,8 +2366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
         getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
       try {
-        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-          desc, true, mvcc);
+        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2471,8 +2436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-          desc, true, mvcc);
+        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -2485,8 +2449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, false, mvcc);
+          WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -2557,15 +2520,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   @VisibleForTesting
   protected long getNextSequenceId(final WAL wal) throws IOException {
-    // TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the
-    // WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or
-    // aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have
-    // a timeout. May happen in tests after we tightened the semantic via HBASE-14317.
-    // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches
-    // so if an abort or stop, there is no way to call them in.
-    WALKey key = this.appendEmptyEdit(wal);
-    mvcc.complete(key.getWriteEntry());
-    return key.getSequenceId(this.maxWaitForSeqId);
+    WriteEntry we = mvcc.begin();
+    mvcc.completeAndWait(we);
+    return we.getWriteNumber();
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -2754,13 +2711,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * accumulating status codes and tracking the index at which processing
    * is proceeding.
    */
-  private abstract static class BatchOperationInProgress<T> {
+  private abstract static class BatchOperation<T> {
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
     WALEdit[] walEditsFromCoprocessors;
 
-    public BatchOperationInProgress(T[] operations) {
+    public BatchOperation(T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
       this.walEditsFromCoprocessors = new WALEdit[operations.length];
@@ -2780,7 +2737,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  private static class MutationBatch extends BatchOperationInProgress<Mutation> {
+  private static class MutationBatch extends BatchOperation<Mutation> {
     private long nonceGroup;
     private long nonce;
     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
@@ -2820,7 +2777,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
+  private static class ReplayBatch extends BatchOperation<MutationReplay> {
     private long replaySeqId = 0;
     public ReplayBatch(MutationReplay[] operations, long seqId) {
       super(operations);
@@ -2906,7 +2863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    *         OperationStatusCode and the exceptionMessage if any.
    * @throws IOException
    */
-  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
+  OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
     boolean initialized = false;
     Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
     startRegionOperation(op);
@@ -2920,11 +2877,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!initialized) {
           this.writeRequestsCount.add(batchOp.operations.length);
           if (!batchOp.isInReplay()) {
-            doPreMutationHook(batchOp);
+            doPreBatchMutateHook(batchOp);
           }
           initialized = true;
         }
-        long addedSize = doMiniBatchMutation(batchOp);
+        long addedSize = doMiniBatchMutate(batchOp);
         long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
         if (isFlushSize(newSize)) {
           requestFlush();
@@ -2936,8 +2893,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return batchOp.retCodeDetails;
   }
 
-
-  private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
+  private void doPreBatchMutateHook(BatchOperation<?> batchOp)
       throws IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
     WALEdit walEdit = new WALEdit();
@@ -2976,103 +2932,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  /**
+   * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
+   * In here we also handle replay of edits on region recover.
+   * @return Change in size brought about by applying <code>batchOp</code>
+   */
   @SuppressWarnings("unchecked")
-  private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
-    boolean isInReplay = batchOp.isInReplay();
-    // variable to note if all Put items are for the same CF -- metrics related
+  // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
+  private long doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
+    boolean replay = batchOp.isInReplay();
+    // Variable to note if all Put items are for the same CF -- metrics related
     boolean putsCfSetConsistent = true;
-    //The set of columnFamilies first seen for Put.
-    Set<byte[]> putsCfSet = null;
-    // variable to note if all Delete items are for the same CF -- metrics related
+    // Variable to note if all Delete items are for the same CF -- metrics related
     boolean deletesCfSetConsistent = true;
-    //The set of columnFamilies first seen for Delete.
+    // The set of columnFamilies first seen for Put.
+    Set<byte[]> putsCfSet = null;
+    // The set of columnFamilies first seen for Delete.
     Set<byte[]> deletesCfSet = null;
-
-    long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
-    WALEdit walEdit = new WALEdit(isInReplay);
-    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
-    long txid = 0;
-    boolean doRollBackMemstore = false;
+    long currentNonceGroup = HConstants.NO_NONCE;
+    long currentNonce = HConstants.NO_NONCE;
+    WALEdit walEdit = new WALEdit(replay);
     boolean locked = false;
-
-    /** Keep track of the locks we hold so we can release them in finally clause */
-    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     // reference family maps directly so coprocessors can mutate them if desired
     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
     boolean success = false;
-    int noOfPuts = 0, noOfDeletes = 0;
-    WALKey walKey = null;
-    long mvccNum = 0;
+    int noOfPuts = 0;
+    int noOfDeletes = 0;
+    WriteEntry writeEntry = null;
+    /** Keep track of the locks we hold so we can release them in finally clause */
+    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     try {
-      // ------------------------------------
-      // STEP 1. Try to acquire as many locks as we can, and ensure
-      // we acquire at least one.
-      // ----------------------------------
+      // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
       int numReadyToWrite = 0;
       long now = EnvironmentEdgeManager.currentTime();
       while (lastIndexExclusive < batchOp.operations.length) {
-        Mutation mutation = batchOp.getMutation(lastIndexExclusive);
-        boolean isPutMutation = mutation instanceof Put;
-
-        Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
-        // store the family map reference to allow for mutations
-        familyMaps[lastIndexExclusive] = familyMap;
-
-        // skip anything that "ran" already
-        if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
-          lastIndexExclusive++;
-          continue;
-        }
-
-        try {
-          if (isPutMutation) {
-            // Check the families in the put. If bad, skip this one.
-            if (isInReplay) {
-              removeNonExistentColumnFamilyForReplay(familyMap);
-            } else {
-              checkFamilies(familyMap.keySet());
-            }
-            checkTimestamps(mutation.getFamilyCellMap(), now);
-          } else {
-            prepareDelete((Delete) mutation);
-          }
-          checkRow(mutation.getRow(), "doMiniBatchMutation");
-        } catch (NoSuchColumnFamilyException nscf) {
-          LOG.warn("No such column family in batch mutation", nscf);
-          batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-              OperationStatusCode.BAD_FAMILY, nscf.getMessage());
-          lastIndexExclusive++;
-          continue;
-        } catch (FailedSanityCheckException fsce) {
-          LOG.warn("Batch Mutation did not pass sanity check", fsce);
-          batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-              OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
-          lastIndexExclusive++;
-          continue;
-        } catch (WrongRegionException we) {
-          LOG.warn("Batch mutation had a row that does not belong to this region", we);
-          batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-              OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+        if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) {
           lastIndexExclusive++;
           continue;
         }
-
-        // If we haven't got any rows in our batch, we should block to
-        // get the next one.
+        Mutation mutation = batchOp.getMutation(lastIndexExclusive);
+        // If we haven't got any rows in our batch, we should block to get the next one.
         RowLock rowLock = null;
         try {
           rowLock = getRowLock(mutation.getRow(), true);
         } catch (IOException ioe) {
-          LOG.warn("Failed getting lock in batch put, row="
-            + Bytes.toStringBinary(mutation.getRow()), ioe);
+          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
         }
         if (rowLock == null) {
           // We failed to grab another lock
-          break; // stop acquiring more rows for this batch
+          break; // Stop acquiring more rows for this batch
         } else {
           acquiredRowLocks.add(rowLock);
         }
@@ -3080,9 +2991,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         lastIndexExclusive++;
         numReadyToWrite++;
 
-        if (isPutMutation) {
+        if (mutation instanceof Put) {
           // If Column Families stay consistent through out all of the
-          // individual puts then metrics can be reported as a mutliput across
+          // individual puts then metrics can be reported as a multiput across
           // column families in the first put.
           if (putsCfSet == null) {
             putsCfSet = mutation.getFamilyCellMap().keySet();
@@ -3100,23 +3011,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         }
       }
 
-      // we should record the timestamp only after we have acquired the rowLock,
+      // We've now grabbed as many mutations off the list as we can
+
+      // STEP 2. Update any LATEST_TIMESTAMP timestamps
+      // We should record the timestamp only after we have acquired the rowLock,
       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
       now = EnvironmentEdgeManager.currentTime();
       byte[] byteNow = Bytes.toBytes(now);
 
       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
-      if (numReadyToWrite <= 0) return 0L;
-
-      // We've now grabbed as many mutations off the list as we can
+      if (numReadyToWrite <= 0) {
+        return 0L;
+      }
 
-      // ------------------------------------
-      // STEP 2. Update any LATEST_TIMESTAMP timestamps
-      // ----------------------------------
-      for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
+      for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
         // skip invalid
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) continue;
+            != OperationStatusCode.NOT_RUN) {
+          // lastIndexExclusive was incremented above.
+          continue;
+        }
 
         Mutation mutation = batchOp.getMutation(i);
         if (mutation instanceof Put) {
@@ -3133,16 +3047,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       locked = true;
 
       // calling the pre CP hook for batch mutation
-      if (!isInReplay && coprocessorHost != null) {
+      if (!replay && coprocessorHost != null) {
         MiniBatchOperationInProgress<Mutation> miniBatchOp =
           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
-        if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
+        if (coprocessorHost.preBatchMutate(miniBatchOp)) {
+          return 0L;
+        }
       }
 
-      // ------------------------------------
       // STEP 3. Build WAL edit
-      // ----------------------------------
       Durability durability = Durability.USE_DEFAULT;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
@@ -3160,26 +3074,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           continue;
         }
 
-        long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
+        long nonceGroup = batchOp.getNonceGroup(i);
+        long nonce = batchOp.getNonce(i);
         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
         // Given how nonces are originally written, these should be contiguous.
         // They don't have to be, it will still work, just write more WALEdits than needed.
         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
-          if (walEdit.size() > 0) {
-            assert isInReplay;
-            if (!isInReplay) {
-              throw new IOException("Multiple nonces per batch and not in replay");
-            }
-            // txid should always increase, so having the one from the last call is ok.
-            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), now, m.getClusterIds(),
-              currentNonceGroup, currentNonce, mvcc);
-            txid = this.wal.append(this.htableDescriptor,  this.getRegionInfo(),  walKey,
-              walEdit, true);
-            walEdit = new WALEdit(isInReplay);
-            walKey = null;
-          }
+          // Write what we have so far for nonces out to WAL
+          appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);
+          walEdit = new WALEdit(replay);
           currentNonceGroup = nonceGroup;
           currentNonce = nonce;
         }
@@ -3194,107 +3097,83 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         addFamilyMapToWALEdit(familyMaps[i], walEdit);
       }
 
-      // -------------------------
-      // STEP 4. Append the final edit to WAL. Do not sync wal.
-      // -------------------------
+      // STEP 4. Append the final edit to WAL and sync.
       Mutation mutation = batchOp.getMutation(firstIndex);
-      if (isInReplay) {
+      WALKey walKey = null;
+      if (replay) {
         // use wal key from the original
         walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
           this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
           mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
-        long replaySeqId = batchOp.getReplaySequenceId();
-        walKey.setOrigLogSeqNum(replaySeqId);
-      }
-      if (walEdit.size() > 0) {
-        if (!isInReplay) {
-        // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-        walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+        walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
+      }
+      // Not sure what is going on here when replay is going on... does the below append get
+      // called for replayed edits? Am afraid to change it without test.
+      if (!walEdit.isEmpty()) {
+        if (!replay) {
+          // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+          walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
             this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
             mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
         }
-        txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+        // TODO: Use the doAppend methods below... complicated by the replay stuff above.
+        try {
+          long txid =
+            this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+          if (txid != 0) sync(txid, durability);
+          writeEntry = walKey.getWriteEntry();
+        } catch (IOException ioe) {
+          if (walKey != null) mvcc.complete(walKey.getWriteEntry());
+          throw ioe;
+        }
       }
-      // ------------------------------------
-      // Acquire the latest mvcc number
-      // ----------------------------------
       if (walKey == null) {
-        // If this is a skip wal operation just get the read point from mvcc
-        walKey = this.appendEmptyEdit(this.wal);
-      }
-      if (!isInReplay) {
-        writeEntry = walKey.getWriteEntry();
-        mvccNum = writeEntry.getWriteNumber();
-      } else {
-        mvccNum = batchOp.getReplaySequenceId();
+        // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid.
+        writeEntry = mvcc.begin();
       }
 
-      // ------------------------------------
       // STEP 5. Write back to memstore
-      // Write to memstore. It is ok to write to memstore
-      // first without syncing the WAL because we do not roll
-      // forward the memstore MVCC. The MVCC will be moved up when
-      // the complete operation is done. These changes are not yet
-      // visible to scanners till we update the MVCC. The MVCC is
-      // moved only when the sync is complete.
-      // ----------------------------------
       long addedSize = 0;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
+        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
           continue;
         }
-        doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], replay,
+            replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
+      }
+
+      // STEP 6. Complete mvcc.
+      if (replay) {
+        this.mvcc.advanceTo(batchOp.getReplaySequenceId());
+      } else if (writeEntry != null/*Can be null if in replay mode*/) {
+        mvcc.completeAndWait(writeEntry);
+        writeEntry = null;
       }
 
-      // -------------------------------
-      // STEP 6. Release row locks, etc.
-      // -------------------------------
+      // STEP 7. Release row locks, etc.
       if (locked) {
         this.updatesLock.readLock().unlock();
         locked = false;
       }
       releaseRowLocks(acquiredRowLocks);
 
-      // -------------------------
-      // STEP 7. Sync wal.
-      // -------------------------
-      if (txid != 0) {
-        syncOrDefer(txid, durability);
-      }
-
-      doRollBackMemstore = false;
       // calling the post CP hook for batch mutation
-      if (!isInReplay && coprocessorHost != null) {
+      if (!replay && coprocessorHost != null) {
         MiniBatchOperationInProgress<Mutation> miniBatchOp =
           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
 
-      // ------------------------------------------------------------------
-      // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
-      // ------------------------------------------------------------------
-      if (writeEntry != null) {
-        mvcc.completeAndWait(writeEntry);
-        writeEntry = null;
-      } else if (isInReplay) {
-        // ensure that the sequence id of the region is at least as big as orig log seq id
-        mvcc.advanceTo(mvccNum);
-      }
-
       for (int i = firstIndex; i < lastIndexExclusive; i ++) {
         if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
           batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
         }
       }
 
-      // ------------------------------------
-      // STEP 9. Run coprocessor post hooks. This should be done after the wal is
+      // STEP 8. Run coprocessor post hooks. This should be done after the wal is
       // synced so that the coprocessor contract is adhered to.
-      // ------------------------------------
-      if (!isInReplay && coprocessorHost != null) {
+      if (!replay && coprocessorHost != null) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
           // only for successful puts
           if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -3313,18 +3192,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       success = true;
       return addedSize;
     } finally {
-      // if the wal sync was unsuccessful, remove keys from memstore
-      if (doRollBackMemstore) {
-        for (int j = 0; j < familyMaps.length; j++) {
-          for(List<Cell> cells:familyMaps[j].values()) {
-            rollbackMemstore(cells);
-          }
-        }
-        if (writeEntry != null) mvcc.complete(writeEntry);
-      } else if (writeEntry != null) {
-        mvcc.completeAndWait(writeEntry);
-      }
-
+      // Call complete rather than completeAndWait because we probably had error if walKey != null
+      if (writeEntry != null) mvcc.complete(writeEntry);
       if (locked) {
         this.updatesLock.readLock().unlock();
       }
@@ -3369,6 +3238,88 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  private void appendCurrentNonces(final Mutation mutation, final boolean replay,
+      final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)
+  throws IOException {
+    if (walEdit.isEmpty()) return;
+    if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
+    WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+        this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
+        currentNonceGroup, currentNonce, mvcc);
+    this.wal.append(this.htableDescriptor,  this.getRegionInfo(), walKey, walEdit, true);
+    // Complete the mvcc transaction started down in append else it will block others
+    this.mvcc.complete(walKey.getWriteEntry());
+  }
+
+  private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive,
+      final Map<byte[], List<Cell>>[] familyMaps, final long now)
+  throws IOException {
+    boolean skip = false;
+    // Skip anything that "ran" already
+    if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
+        != OperationStatusCode.NOT_RUN) {
+      return true;
+    }
+    Mutation mutation = batchOp.getMutation(lastIndexExclusive);
+    Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
+    // store the family map reference to allow for mutations
+    familyMaps[lastIndexExclusive] = familyMap;
+
+    try {
+      if (mutation instanceof Put) {
+        // Check the families in the put. If bad, skip this one.
+        if (batchOp.isInReplay()) {
+          removeNonExistentColumnFamilyForReplay(familyMap);
+        } else {
+          checkFamilies(familyMap.keySet());
+        }
+        checkTimestamps(mutation.getFamilyCellMap(), now);
+      } else {
+        prepareDelete((Delete)mutation);
+      }
+      checkRow(mutation.getRow(), "doMiniBatchMutation");
+    } catch (NoSuchColumnFamilyException nscf) {
+      LOG.warn("No such column family in batch mutation", nscf);
+      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+          OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+      skip = true;
+    } catch (FailedSanityCheckException fsce) {
+      LOG.warn("Batch Mutation did not pass sanity check", fsce);
+      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+          OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+      skip = true;
+    } catch (WrongRegionException we) {
+      LOG.warn("Batch mutation had a row that does not belong to this region", we);
+      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
+          OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+      skip = true;
+    }
+    return skip;
+  }
+
+  /**
+   * During replay, there could exist column families which are removed between region server
+   * failure and replay
+   */
+  private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {
+    List<byte[]> nonExistentList = null;
+    for (byte[] family : familyMap.keySet()) {
+      if (!this.htableDescriptor.hasFamily(family)) {
+        if (nonExistentList == null) {
+          nonExistentList = new ArrayList<byte[]>();
+        }
+        nonExistentList.add(family);
+      }
+    }
+    if (nonExistentList != null) {
+      for (byte[] family : nonExistentList) {
+        // Perhaps schema was changed between crash and replay
+        LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+        familyMap.remove(family);
+      }
+    }
+  }
+
   /**
    * Returns effective durability from the passed durability and
    * the table descriptor.
@@ -3377,93 +3328,82 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return d == Durability.USE_DEFAULT ? this.durability : d;
   }
 
-  //TODO, Think that gets/puts and deletes should be refactored a bit so that
-  //the getting of the lock happens before, so that you would just pass it into
-  //the methods. So in the case of checkAndMutate you could just do lockRow,
-  //get, put, unlockRow or something
-
   @Override
   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
-      CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
+      CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,
       boolean writeToWAL)
   throws IOException{
+    checkMutationType(mutation, row);
+    return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null,
+      mutation, writeToWAL);
+  }
+
+  @Override
+  public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
+      CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
+      boolean writeToWAL)
+  throws IOException {
+    return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null,
+      writeToWAL);
+  }
+
+  /**
+   * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
+   * switches in the few places where there is deviation.
+   */
+  private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
+      CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations,
+      Mutation mutation, boolean writeToWAL)
+  throws IOException {
+    // Could do the below checks but seems wacky with two callers only. Just comment out for now.
+    // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
+    // need these commented out checks.
+    // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null");
+    // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set");
     checkReadOnly();
-    //TODO, add check for value length or maybe even better move this to the
-    //client if this becomes a global setting
+    // TODO, add check for value length also move this check to the client
     checkResources();
-    boolean isPut = w instanceof Put;
-    if (!isPut && !(w instanceof Delete))
-      throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
-          "be Put or Delete");
-    if (!Bytes.equals(row, w.getRow())) {
-      throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
-          "getRow must match the passed row");
-    }
-
     startRegionOperation();
     try {
       Get get = new Get(row);
       checkFamily(family);
       get.addColumn(family, qualifier);
-
       // Lock row - note that doBatchMutate will relock this row if called
       RowLock rowLock = getRowLock(get.getRow());
-      // wait for all previous transactions to complete (with lock held)
-      mvcc.await();
       try {
-        if (this.getCoprocessorHost() != null) {
+        if (mutation != null && this.getCoprocessorHost() != null) {
+          // Call coprocessor.
           Boolean processed = null;
-          if (w instanceof Put) {
+          if (mutation instanceof Put) {
             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
-                qualifier, compareOp, comparator, (Put) w);
-          } else if (w instanceof Delete) {
+                qualifier, compareOp, comparator, (Put)mutation);
+          } else if (mutation instanceof Delete) {
             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
-                qualifier, compareOp, comparator, (Delete) w);
+                qualifier, compareOp, comparator, (Delete)mutation);
           }
           if (processed != null) {
             return processed;
           }
         }
+        // NOTE: We used to wait here until mvcc caught up:  mvcc.await();
+        // Supposition is that now all changes are done under row locks, then when we go to read,
+        // we'll get the latest on this row.
         List<Cell> result = get(get, false);
-
-        boolean valueIsNull = comparator.getValue() == null ||
-          comparator.getValue().length == 0;
+        boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
         boolean matches = false;
         long cellTs = 0;
         if (result.size() == 0 && valueIsNull) {
           matches = true;
-        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
-            valueIsNull) {
+        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
           matches = true;
           cellTs = result.get(0).getTimestamp();
         } else if (result.size() == 1 && !valueIsNull) {
           Cell kv = result.get(0);
           cellTs = kv.getTimestamp();
           int compareResult = CellComparator.compareValue(kv, comparator);
-          switch (compareOp) {
-          case LESS:
-            matches = compareResult < 0;
-            break;
-          case LESS_OR_EQUAL:
-            matches = compareResult <= 0;
-            break;
-          case EQUAL:
-            matches = compareResult == 0;
-            break;
-          case NOT_EQUAL:
-            matches = compareResult != 0;
-            break;
-          case GREATER_OR_EQUAL:
-            matches = compareResult >= 0;
-            break;
-          case GREATER:
-            matches = compareResult > 0;
-            break;
-          default:
-            throw new RuntimeException("Unknown Compare op " + compareOp.name());
-          }
+          matches = matches(compareOp, compareResult);
         }
-        //If matches put the new put or delete the new delete
+        // If matches put the new put or delete the new delete
         if (matches) {
           // We have acquired the row lock already. If the system clock is NOT monotonically
           // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
@@ -3472,16 +3412,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           long now = EnvironmentEdgeManager.currentTime();
           long ts = Math.max(now, cellTs); // ensure write is not eclipsed
           byte[] byteTs = Bytes.toBytes(ts);
-
-          if (w instanceof Put) {
-            updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
+          if (mutation != null) {
+            if (mutation instanceof Put) {
+              updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs);
+            }
+            // And else 'delete' is not needed since it already does a second get, and sets the
+            // timestamp from get (see prepareDeleteTimestamps).
+          } else {
+            for (Mutation m: rowMutations.getMutations()) {
+              if (m instanceof Put) {
+                updateCellTimestamps(m.getFamilyCellMap().values(), byteTs);
+              }
+            }
+            // And else 'delete' is not needed since it already does a second get, and sets the
+            // timestamp from get (see prepareDeleteTimestamps).
+          }
+          // All edits for the given row (across all column families) must happen atomically.
+          if (mutation != null) {
+            doBatchMutate(mutation);
+          } else {
+            mutateRow(rowMutations);
           }
-          // else delete is not needed since it already does a second get, and sets the timestamp
-          // from get (see prepareDeleteTimestamps).
-
-          // All edits for the given row (across all column families) must
-          // happen atomically.
-          doBatchMutate(w);
           this.checkAndMutateChecksPassed.increment();
           return true;
         }
@@ -3495,113 +3446,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  //TODO, Think that gets/puts and deletes should be refactored a bit so that
-  //the getting of the lock happens before, so that you would just pass it into
-  //the methods. So in the case of checkAndMutate you could just do lockRow,
-  //get, put, unlockRow or something
+  private void checkMutationType(final Mutation mutation, final byte [] row)
+  throws DoNotRetryIOException {
+    boolean isPut = mutation instanceof Put;
+    if (!isPut && !(mutation instanceof Delete)) {
+      throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete");
+    }
+    if (!Bytes.equals(row, mutation.getRow())) {
+      throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match");
+    }
+  }
 
-  @Override
-  public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
-      CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
-      boolean writeToWAL) throws IOException {
-    checkReadOnly();
-    //TODO, add check for value length or maybe even better move this to the
-    //client if this becomes a global setting
-    checkResources();
+  private boolean matches(final CompareOp compareOp, final int compareResult) {
+    boolean matches = false;
+    switch (compareOp) {
+      case LESS:
+        matches = compareResult < 0;
+        break;
+      case LESS_OR_EQUAL:
+        matches = compareResult <= 0;
+        break;
+      case EQUAL:
+        matches = compareResult == 0;
+        break;
+      case NOT_EQUAL:
+        matches = compareResult != 0;
+        break;
+      case GREATER_OR_EQUAL:
+        matches = compareResult >= 0;
+        break;
+      case GREATER:
+        matches = compareResult > 0;
+        break;
+      default:
+        throw new RuntimeException("Unknown Compare op " + compareOp.name());
+    }
+    return matches;
+  }
 
-    startRegionOperation();
-    try {
-      Get get = new Get(row);
-      checkFamily(family);
-      get.addColumn(family, qualifier);
 
-      // Lock row - note that doBatchMutate will relock this row if called
-      RowLock rowLock = getRowLock(get.getRow());
-      // wait for all previous transactions to complete (with lock held)
-      mvcc.await();
-      try {
-        List<Cell> result = get(get, false);
-
-        boolean valueIsNull = comparator.getValue() == null ||
-            comparator.getValue().length == 0;
-        boolean matches = false;
-        long cellTs = 0;
-        if (result.size() == 0 && valueIsNull) {
-          matches = true;
-        } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
-            valueIsNull) {
-          matches = true;
-          cellTs = result.get(0).getTimestamp();
-        } else if (result.size() == 1 && !valueIsNull) {
-          Cell kv = result.get(0);
-          cellTs = kv.getTimestamp();
-          int compareResult = CellComparator.compareValue(kv, comparator);
-          switch (compareOp) {
-          case LESS:
-            matches = compareResult < 0;
-            break;
-          case LESS_OR_EQUAL:
-            matches = compareResult <= 0;
-            break;
-          case EQUAL:
-            matches = compareResult == 0;
-            break;
-          case NOT_EQUAL:
-            matches = compareResult != 0;
-            break;
-          case GREATER_OR_EQUAL:
-            matches = compareResult >= 0;
-            break;
-          case GREATER:
-            matches = compareResult > 0;
-            break;
-          default:
-            throw new RuntimeException("Unknown Compare op " + compareOp.name());
-          }
-        }
-        //If matches put the new put or delete the new delete
-        if (matches) {
-          // We have acquired the row lock already. If the system clock is NOT monotonically
-          // non-decreasing (see HBASE-14070) we should make sure that the mutation has a
-          // larger timestamp than what was observed via Get. doBatchMutate already does this, but
-          // there is no way to pass the cellTs. See HBASE-14054.
-          long now = EnvironmentEdgeManager.currentTime();
-          long ts = Math.max(now, cellTs); // ensure write is not eclipsed
-          byte[] byteTs = Bytes.toBytes(ts);
-
-          for (Mutation w : rm.getMutations()) {
-            if (w instanceof Put) {
-              updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
-            }
-            // else delete is not needed since it already does a second get, and sets the timestamp
-            // from get (see prepareDeleteTimestamps).
-          }
-
-          // All edits for the given row (across all column families) must
-          // happen atomically.
-          mutateRow(rm);
-          this.checkAndMutateChecksPassed.increment();
-          return true;
-        }
-        this.checkAndMutateChecksFailed.increment();
-        return false;
-      } finally {
-        rowLock.release();
-      }
-    } finally {
-      closeRegionOperation();
-    }
-  }
-
-  private void doBatchMutate(Mutation mutation) throws IOException {
-    // Currently this is only called for puts and deletes, so no nonces.
-    OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
-    if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
-      throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
-    } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
-      throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
-    }
-  }
+  private void doBatchMutate(Mutation mutation) throws IOException {
+    // Currently this is only called for puts and deletes, so no nonces.
+    OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
+    if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
+      throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
+    } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
+      throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+    }
+  }
 
   /**
    * Complete taking the snapshot on the region. Writes the region info and adds references to the
@@ -3663,40 +3555,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
     // Check if we have any work to do and early out otherwise
     // Update these checks as more logic is added here
-
     if (m.getTTL() == Long.MAX_VALUE) {
       return;
     }
 
     // From this point we know we have some work to do
-
     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
       int listSize = cells.size();
       for (int i = 0; i < listSize; i++) {
         Cell cell = cells.get(i);
-        List<Tag> newTags = new ArrayList<Tag>();
-        Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell);
-
-        // Carry forward existing tags
-
-        while (tagIterator.hasNext()) {
-
-          // Add any filters or tag specific rewrites here
-
-          newTags.add(tagIterator.next());
-        }
-
-        // Cell TTL handling
-
-        // Check again if we need to add a cell TTL because early out logic
-        // above may change when there are more tag based features in core.
-        if (m.getTTL() != Long.MAX_VALUE) {
-          // Add a cell TTL tag
-          newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
-        }
-
+        List<Tag> newTags = TagUtil.carryForwardTags(null, cell);
+        newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL());
         // Rewrite the cell with the updated set of tags
         cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags)));
       }
@@ -3772,49 +3643,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * should already have locked updatesLock.readLock(). This also does
    * <b>not</b> check the families for validity.
    *
-   * @param familyMap Map of kvs per family
-   * @param mvccNum The MVCC for this transaction.
-   * @param isInReplay true when adding replayed KVs into memstore
-   * @return the additional memory usage of the memstore caused by the
-   * new entries.
+   * @param familyMap Map of Cells by family
+   * @return the additional memory usage of the memstore caused by the new entries.
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    long mvccNum, boolean isInReplay) throws IOException {
+  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, boolean replay,
+      long sequenceId)
+  throws IOException {
     long size = 0;
-
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
       byte[] family = e.getKey();
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
-      Store store = getStore(family);
-      int listSize = cells.size();
-      for (int i=0; i < listSize; i++) {
+      size += applyToMemstore(getStore(family), cells, false, replay, sequenceId);
+    }
+    return size;
+  }
+
+  /**
+   * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
+   *  set; when set we will run operations that make sense in the increment/append scenario but
+   *  that do not make sense otherwise.
+   * @return Memstore change in size on insert of these Cells.
+   * @see #applyToMemstore(Store, Cell, long)
+   */
+  private long applyToMemstore(final Store store, final List<Cell> cells,
+      final boolean delta, boolean replay, long sequenceId)
+  throws IOException {
+    // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
+    long size = 0;
+    boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
+    int count = cells.size();
+    if (upsert) {
+      size += store.upsert(cells, getSmallestReadPoint());
+    } else {
+      for (int i = 0; i < count; i++) {
         Cell cell = cells.get(i);
-        if (cell.getSequenceId() == 0 || isInReplay) {
-          CellUtil.setSequenceId(cell, mvccNum);
+        // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack
+        // When is it zero anyways? When replay? Then just rely on that flag.
+        if (cell.getSequenceId() == 0 || replay) {
+          CellUtil.setSequenceId(cell, sequenceId);
         }
         size += store.add(cell);
       }
     }
-
-     return size;
-   }
+    return size;
+  }
 
   /**
-   * Remove all the keys listed in the map from the memstore. This method is
-   * called when a Put/Delete has updated memstore but subsequently fails to update
-   * the wal. This method is then invoked to rollback the memstore.
+   * @return Memstore change in size on insert of these Cells.
+   * @see #applyToMemstore(Store, List, boolean, boolean, long)
    */
-  private void rollbackMemstore(List<Cell> memstoreCells) {
-    int kvsRolledback = 0;
-
-    for (Cell cell : memstoreCells) {
-      byte[] family = CellUtil.cloneFamily(cell);
-      Store store = getStore(family);
-      store.rollback(cell);
-      kvsRolledback++;
+  private long applyToMemstore(final Store store, final Cell cell, long sequenceId)
+  throws IOException {
+    // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
+    if (store == null) {
+      checkFamily(CellUtil.cloneFamily(cell));
+      // Unreachable because checkFamily will throw exception
     }
-    LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
+    return store.add(cell);
   }
 
   @Override
@@ -3824,30 +3710,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /**
-   * During replay, there could exist column families which are removed between region server
-   * failure and replay
-   */
-  private void removeNonExistentColumnFamilyForReplay(
-      final Map<byte[], List<Cell>> familyMap) {
-    List<byte[]> nonExistentList = null;
-    for (byte[] family : familyMap.keySet()) {
-      if (!this.htableDescriptor.hasFamily(family)) {
-        if (nonExistentList == null) {
-          nonExistentList = new ArrayList<byte[]>();
-        }
-        nonExistentList.add(family);
-      }
-    }
-    if (nonExistentList != null) {
-      for (byte[] family : nonExistentList) {
-        // Perhaps schema was changed between crash and replay
-        LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
-        familyMap.remove(family);
-      }
-    }
-  }
-
   @Override
   public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
       throws FailedSanityCheckException {
@@ -5490,12 +5352,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return true;
     } finally {
       if (wal != null && !storeFiles.isEmpty()) {
-        // write a bulk load event when not all hfiles are loaded
+        // @rite a bulk load event when not all hfiles are loaded
         try {
           WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
               this.getRegionInfo().getTable(),
               ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
-          WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
+          WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
           if (this.rsServices != null) {
@@ -5593,7 +5455,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // getSmallestReadPoint, before scannerReadPoints is updated.
       IsolationLevel isolationLevel = scan.getIsolationLevel();
       synchronized(scannerReadPoints) {
-        this.readPt = getReadpoint(isolationLevel);
+        this.readPt = getReadPoint(isolationLevel);
         scannerReadPoints.put(this, this.readPt);
       }
 
@@ -5758,7 +5620,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // As the data is obtained from two independent heaps, we need to
       // ensure that result list is sorted, because Result relies on that.
-      Collections.sort(results, comparator);
+      sort(results, comparator);
       return moreValues;
     }
 
@@ -6876,7 +6738,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
       long nonceGroup, long nonce) throws IOException {
-
     for (byte[] row : processor.getRowsToLock()) {
       checkRow(row, "processRowsWithLocks");
     }
@@ -6884,23 +6745,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       checkReadOnly();
     }
     checkResources();
-
     startRegionOperation();
     WALEdit walEdit = new WALEdit();
 
-    // 1. Run pre-process hook
-    try {
-      processor.preProcess(this, walEdit);
-    } catch (IOException e) {
-      closeRegionOperation();
-      throw e;
-    }
+    // STEP 1. Run pre-process hook
+    preProcess(processor, walEdit);
     // Short circuit the read only case
     if (processor.readOnly()) {
       try {
         long now = EnvironmentEdgeManager.currentTime();
-        doProcessRowWithTimeout(
-            processor, now, this, null, null, timeout);
+        doProcessRowWithTimeout(processor, now, this, null, null, timeout);
         processor.postProcess(this, walEdit, true);
       } finally {
         closeRegionOperation();
@@ -6908,118 +6762,81 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
 
-    MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
     boolean locked;
-    boolean walSyncSuccessful = false;
     List<RowLock> acquiredRowLocks;
     long addedSize = 0;
     List<Mutation> mutations = new ArrayList<Mutation>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
-    long mvccNum = 0;
-    WALKey walKey = null;
+    // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
+    // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
+    WriteEntry writeEntry = null;
     try {
-      // 2. Acquire the row lock(s)
+      // STEP 2. Acquire the row lock(s)
       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
       for (byte[] row : rowsToLock) {
         // Attempt to lock all involved rows, throw if any lock times out
         // use a writer lock for mixed reads and writes
         acquiredRowLocks.add(getRowLock(row));
       }
-      // 3. Region lock
+      // STEP 3. Region lock
       lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
       locked = true;
-
+      boolean success = false;
       long now = EnvironmentEdgeManager.currentTime();
       try {
-        // 4. Let the processor scan the rows, generate mutations and add
-        //    waledits
-        doProcessRowWithTimeout(
-            processor, now, this, mutations, walEdit, timeout);
-
+        // STEP 4. Let the processor scan the rows, generate mutations and add waledits
+        doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
         if (!mutations.isEmpty()) {
-
-          // 5. Call the preBatchMutate hook
+          // STEP 5. Call the preBatchMutate hook
           processor.preBatchMutate(this, walEdit);
 
-          long txid = 0;
-          // 6. Append no sync
+          // STEP 6. Append and sync if walEdit has data to write out.
           if (!walEdit.isEmpty()) {
-            // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-              processor.getClusterIds(), nonceGroup, nonce, mvcc);
-            txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
-                walKey, walEdit, false);
-          }
-          if(walKey == null){
-            // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
-            // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
-            walKey = this.appendEmptyEdit(this.wal);
+            writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
+                processor.getClusterIds(), now, nonceGroup, nonce);
+          } else {
+            // We are here if WAL is being skipped.
+            writeEntry = this.mvcc.begin();
           }
 
-          // 7. Start mvcc transaction
-          writeEntry = walKey.getWriteEntry();
-          mvccNum = walKey.getSequenceId();
-
-
-
-          // 8. Apply to memstore
+          // STEP 7. Apply to memstore
+          long sequenceId = writeEntry.getWriteNumber();
           for (Mutation m : mutations) {
-            // Handle any tag based cell features
+            // Handle any tag based cell features.
+            // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before
+            // so tags go into WAL?
             rewriteCellTags(m.getFamilyCellMap(), m);
-
             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
               Cell cell = cellScanner.current();
-              CellUtil.setSequenceId(cell, mvccNum);
-              Store store = getStore(cell);
-              if (store == null) {
-                checkFamily(CellUtil.cloneFamily(cell));
-                // unreachable
+              if (walEdit.isEmpty()) {
+                // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
+                // If no WAL, need to stamp it here.
+                CellUtil.setSequenceId(cell, sequenceId);
               }
-              addedSize += store.add(cell);
+              Store store = getStore(cell);
+              addedSize += applyToMemstore(store, cell, sequenceId);
             }
           }
+          // STEP 8. Complete mvcc.
+          mvcc.completeAndWait(writeEntry);
+          writeEntry = null;
 
-          // 9. Release region lock
+          // STEP 9. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
             locked = false;
           }
 
-          // 10. Release row lock(s)
+          // STEP 10. Release row lock(s)
           releaseRowLocks(acquiredRowLocks);
 
-          // 11. Sync edit log
-          if (txid != 0) {
-            syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
-          }
-          walSyncSuccessful = true;
-          // 12. call postBatchMutate hook
+          // STEP 11. call postBatchMutate hook
           processor.postBatchMutate(this);
         }
+        success = true;
       } finally {
-        // TODO: Make this method look like all other methods that are doing append/sync and
-        // memstore rollback such as append and doMiniBatchMutation. Currently it is a little
-        // different. Make them all share same code!
-        if (!mutations.isEmpty() && !walSyncSuccessful) {
-          LOG.warn("Wal sync failed. Roll back " + mutations.size() +
-              " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
-              processor.getRowsToLock().iterator().next()) + "...");
-          for (Mutation m : mutations) {
-            for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
-              Cell cell = cellScanner.current();
-              getStore(cell).rollback(cell);
-            }
-          }
-          if (writeEntry != null) {
-            mvcc.complete(writeEntry);
-            writeEntry = null;
-          }
-        }
-        // 13. Roll mvcc forward
-        if (writeEntry != null) {
-          mvcc.completeAndWait(writeEntry);
-        }
+        // Call complete rather than completeAndWait because we probably had error if walKey != null
+        if (writeEntry != null) mvcc.complete(writeEntry);
         if (locked) {
           this.updatesLock.readLock().unlock();
         }
@@ -7027,18 +6844,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         releaseRowLocks(acquiredRowLocks);
       }
 
-      // 14. Run post-process hook
-      processor.postProcess(this, walEdit, walSyncSuccessful);
-
+      // 12. Run post-process hook
+      processor.postProcess(this, walEdit, success);
     } finally {
       closeRegionOperation();
-      if (!mutations.isEmpty() &&
-          isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
+      if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
         requestFlush();
       }
     }
   }
 
+  private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit)
+  throws IOException {
+    try {
+      processor.preProcess(this, walEdit);
+    } catch (IOException e) {
+      closeRegionOperation();
+      throw e;
+    }
+  }
+
   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
                                        final long now,
                                        final HRegion region,
@@ -7089,500 +6914,400 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /**
-   * @return The passed-in {@code tags} but with the tags from {@code cell} added.
-   */
-  private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
-    if (cell.getTagsLength() <= 0) return tags;
-    List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
-    Iterator<Tag> i = CellUtil.tagsIterator(cell);
-    while (i.hasNext()) newTags.add(i.next());
-    return newTags;
+  public Result append(Append append) throws IOException {
+    return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  /**
-   * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.
-   * @return Get result.
-   */
-  private List<Cell> doGet(final Store store, final byte [] row,
-      final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
-  throws IOException {
-    // Sort the cells so that they match the order that they
-    // appear in the Get results. Otherwise, we won't be able to
-    // find the existing values if the cells are not specified
-    // in order by the client since cells are in an array list.
-    Collections.sort(family.getValue(), store.getComparator());
-    // Get previous values for all columns in this family
-    Get get = new Get(row);
-    for (Cell cell : family.getValue()) {
-      get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
-    }
-    if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
-    return get(get, false);
+  @Override
+  public Result append(Append mutation, long nonceGroup, long nonce) throws IOException {
+    return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults());
   }
 
-  public Result append(Append append) throws IOException {
-    return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  public Result increment(Increment increment) throws IOException {
+    return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  // TODO: There's a lot of boiler plate code identical to increment.
-  // We should refactor append and increment as local get-mutate-put
-  // transactions, so all stores only go through one code path for puts.
-
   @Override
-  public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
-    Operation op = Operation.APPEND;
-    byte[] row = mutate.getRow();
-    checkRow(row, op.toString());
-    checkFamilies(mutate.getFamilyCellMap().keySet());
-    boolean flush = false;
-    Durability durability = getEffectiveDurability(mutate.getDurability());
-    boolean writeToWAL = durability != Durability.SKIP_WAL;
-    WALEdit walEdits = null;
-    List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
-    Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-    long size = 0;
-    long txid = 0;
+  public Result increment(Increment mutation, long nonceGroup, long nonce)
+  throws IOException {
+    return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults());
+  }
+
+  /**
+   * Add "deltas" to Cells. Deltas are increments or appends. Switch on <code>op</code>.
+   *
+   * <p>If increment, add deltas to current values or if an append, then
+   * append the deltas to the current Cell values.
+   *
+   * <p>Append and Increment code paths are mostly the same. They differ in just a few pl

<TRUNCATED>