You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/06/07 03:19:55 UTC

[2/2] git commit: hbase-8763: Combine MVCC and SeqId

hbase-8763: Combine MVCC and SeqId


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c682d57e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c682d57e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c682d57e

Branch: refs/heads/master
Commit: c682d57e92d9f18a02e1fe8dc50c5caa116e5d4a
Parents: d6cc2fb
Author: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Authored: Fri Jun 6 18:25:46 2014 -0700
Committer: Jeffrey Zhong <jz...@JZhongs-MacBook-Pro.local>
Committed: Fri Jun 6 18:25:46 2014 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/DefaultMemStore.java     |  17 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 413 +++++++++++--------
 .../hadoop/hbase/regionserver/HStore.java       |   5 +-
 .../hadoop/hbase/regionserver/MemStore.java     |   6 +-
 .../MultiVersionConsistencyControl.java         | 187 ++++++---
 .../hbase/regionserver/SequenceNumber.java      |  31 ++
 .../apache/hadoop/hbase/regionserver/Store.java |   5 +-
 .../hadoop/hbase/regionserver/StoreFlusher.java |   6 -
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  67 +--
 .../hbase/regionserver/wal/FSWALEntry.java      |  31 +-
 .../hadoop/hbase/regionserver/wal/HLog.java     |  11 +-
 .../hadoop/hbase/regionserver/wal/HLogKey.java  |  45 +-
 .../hbase/regionserver/wal/HLogSplitter.java    |   4 +-
 .../hadoop/hbase/regionserver/wal/HLogUtil.java |   2 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |   2 +-
 .../hadoop/hbase/client/TestMultiParallel.java  |   4 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  23 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |  17 +-
 .../TestMultiVersionConsistencyControl.java     |   4 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  12 +-
 20 files changed, 554 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 51f22d8..ad084a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
@@ -210,12 +211,13 @@ public class DefaultMemStore implements MemStore {
   /**
    * Write an update
    * @param cell
-   * @return approximate size of the passed key and value.
+   * @return approximate size of the passed KV & newly added KV which maybe different than the
+   *         passed-in KV
    */
   @Override
-  public long add(Cell cell) {
+  public Pair<Long, Cell> add(Cell cell) {
     KeyValue toAdd = maybeCloneWithAllocator(KeyValueUtil.ensureKeyValue(cell));
-    return internalAdd(toAdd);
+    return new Pair<Long, Cell>(internalAdd(toAdd), toAdd);
   }
 
   @Override
@@ -1051,18 +1053,21 @@ public class DefaultMemStore implements MemStore {
     byte [] empty = new byte[0];
     for (int i = 0; i < count; i++) {
       // Give each its own ts
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      size += ret.getFirst();
     }
     LOG.info("memstore1 estimated size=" + size);
     for (int i = 0; i < count; i++) {
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      Pair<Long, Cell> ret = memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      size += ret.getFirst();
     }
     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
     // Make a variably sized memstore.
     DefaultMemStore memstore2 = new DefaultMemStore();
     for (int i = 0; i < count; i++) {
-      size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
+      Pair<Long, Cell> ret = memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
         new byte[i]));
+      size += ret.getFirst();
     }
     LOG.info("memstore2 estimated size=" + size);
     final int seconds = 30;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0c4f7ee..dedaf41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -824,10 +824,11 @@ public class HRegion implements HeapSize { // , Writable{
         }
       }
     }
-    mvcc.initialize(maxMemstoreTS + 1);
     // Recover any edits if available.
     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+    maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
+    mvcc.initialize(maxSeqId);
     return maxSeqId;
   }
 
@@ -1684,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{
           // wal can be null replaying edits.
           return wal != null?
             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
-              getNextSequenceId(wal, startTime), "Nothing to flush"):
+              getNextSequenceId(wal), "Nothing to flush"):
             new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
         }
       } finally {
@@ -1714,58 +1715,64 @@ public class HRegion implements HeapSize { // , Writable{
       getRegionInfo().getEncodedName());
     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
     long flushSeqId = -1L;
+
     try {
-      // Record the mvcc for all transactions in progress.
-      w = mvcc.beginMemstoreInsert();
-      mvcc.advanceMemstore(w);
-      if (wal != null) {
-        if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
-          // This should never happen.
-          String msg = "Flush will not be started for ["
-              + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
-          status.setStatus(msg);
-          return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+      try {
+        w = mvcc.beginMemstoreInsert();
+        if (wal != null) {
+          if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+            // This should never happen.
+            String msg = "Flush will not be started for ["
+                + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+            status.setStatus(msg);
+            return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+          }
+          // Get a sequence id that we can use to denote the flush. It will be one beyond the last
+          // edit that made it into the hfile (the below does not add an edit, it just asks the
+          // WAL system to return next sequence edit).
+          flushSeqId = getNextSequenceId(wal);
+        } else {
+          // use the provided sequence Id as WAL is not being used for this flush.
+          flushSeqId = myseqid;
         }
-        // Get a sequence id that we can use to denote the flush.  It will be one beyond the last
-        // edit that made it into the hfile (the below does not add an edit, it just asks the
-        // WAL system to return next sequence edit).
-        flushSeqId = getNextSequenceId(wal, startTime);
-      } else {
-        // use the provided sequence Id as WAL is not being used for this flush.
-        flushSeqId = myseqid;
-      }
 
-      for (Store s : stores.values()) {
-        totalFlushableSize += s.getFlushableSize();
-        storeFlushCtxs.add(s.createFlushContext(flushSeqId));
-      }
+        for (Store s : stores.values()) {
+          totalFlushableSize += s.getFlushableSize();
+          storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+        }
 
-      // Prepare flush (take a snapshot)
-      for (StoreFlushContext flush : storeFlushCtxs) {
-        flush.prepare();
+        // Prepare flush (take a snapshot)
+        for (StoreFlushContext flush : storeFlushCtxs) {
+          flush.prepare();
+        }
+      } finally {
+        this.updatesLock.writeLock().unlock();
       }
+      String s = "Finished memstore snapshotting " + this +
+        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+      status.setStatus(s);
+      if (LOG.isTraceEnabled()) LOG.trace(s);
+      // sync unflushed WAL changes when deferred log sync is enabled
+      // see HBASE-8208 for details
+      if (wal != null && !shouldSyncLog()) wal.sync();
+
+      // wait for all in-progress transactions to commit to HLog before
+      // we can start the flush. This prevents
+      // uncommitted transactions from being written into HFiles.
+      // We have to block before we start the flush, otherwise keys that
+      // were removed via a rollbackMemstore could be written to Hfiles.
+      mvcc.waitForPreviousTransactionsComplete(w);
+      // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
+      w = null;
+      s = "Flushing stores of " + this;
+      status.setStatus(s);
+      if (LOG.isTraceEnabled()) LOG.trace(s);
     } finally {
-      this.updatesLock.writeLock().unlock();
+      if (w != null) {
+        // in case of failure just mark current w as complete
+        mvcc.advanceMemstore(w);
+      }
     }
-    String s = "Finished memstore snapshotting " + this +
-      ", syncing WAL and waiting on mvcc, flushSize=" + totalFlushableSize;
-    status.setStatus(s);
-    if (LOG.isTraceEnabled()) LOG.trace(s);
-
-    // sync unflushed WAL changes when deferred log sync is enabled
-    // see HBASE-8208 for details
-    if (wal != null && !shouldSyncLog()) wal.sync();
-
-    // wait for all in-progress transactions to commit to HLog before
-    // we can start the flush. This prevents
-    // uncommitted transactions from being written into HFiles.
-    // We have to block before we start the flush, otherwise keys that
-    // were removed via a rollbackMemstore could be written to Hfiles.
-    mvcc.waitForRead(w);
-
-    s = "Flushing stores of " + this;
-    status.setStatus(s);
-    if (LOG.isTraceEnabled()) LOG.trace(s);
 
     // Any failure from here on out will be catastrophic requiring server
     // restart so hlog content can be replayed and put back into the memstore.
@@ -1849,13 +1856,9 @@ public class HRegion implements HeapSize { // , Writable{
    * @return Next sequence number unassociated with any actual edit.
    * @throws IOException
    */
-  private long getNextSequenceId(final HLog wal, final long now) throws IOException {
-    HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
-    // Call append but with an empty WALEdit.  The returned sequence id will not be associated
-    // with any edit and we can be sure it went in after all outstanding appends.
-    wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
-      WALEdit.EMPTY_WALEDIT, this.sequenceId, false);
-    return key.getLogSeqNum();
+  private long getNextSequenceId(final HLog wal) throws IOException {
+    HLogKey key = this.appendNoSyncNoAppend(wal, null);
+    return key.getSequenceNumber();
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -2349,11 +2352,14 @@ public class HRegion implements HeapSize { // , Writable{
     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     // reference family maps directly so coprocessors can mutate them if desired
     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
+    List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
     boolean success = false;
     int noOfPuts = 0, noOfDeletes = 0;
+    HLogKey walKey = null;
+    long mvccNum = 0;
     try {
       // ------------------------------------
       // STEP 1. Try to acquire as many locks as we can, and ensure
@@ -2475,13 +2481,13 @@ public class HRegion implements HeapSize { // , Writable{
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
       locked = true;
-
+      mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
       //
       // ------------------------------------
       // Acquire the latest mvcc number
       // ----------------------------------
-      w = mvcc.beginMemstoreInsert();
-
+      w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
+      
       // calling the pre CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
         MiniBatchOperationInProgress<Mutation> miniBatchOp =
@@ -2506,13 +2512,12 @@ public class HRegion implements HeapSize { // , Writable{
           continue;
         }
         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
       }
 
       // ------------------------------------
       // STEP 4. Build WAL edit
       // ----------------------------------
-      boolean hasWalAppends = false;
       Durability durability = Durability.USE_DEFAULT;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
@@ -2543,13 +2548,13 @@ public class HRegion implements HeapSize { // , Writable{
               throw new IOException("Multiple nonces per batch and not in replay");
             }
             // txid should always increase, so having the one from the last call is ok.
-            HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup,
-              currentNonce);
-            txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  key,
-              walEdit, getSequenceId(), true);
-            hasWalAppends = true;
+            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+              this.htableDescriptor.getTableName(), now, m.getClusterIds(), 
+              currentNonceGroup, currentNonce);
+            txid = this.log.appendNoSync(this.htableDescriptor,  this.getRegionInfo(),  walKey,
+              walEdit, getSequenceId(), true, null);
             walEdit = new WALEdit(isInReplay);
+            walKey = null;
           }
           currentNonceGroup = nonceGroup;
           currentNonce = nonce;
@@ -2570,12 +2575,15 @@ public class HRegion implements HeapSize { // , Writable{
       // -------------------------
       Mutation mutation = batchOp.getMutation(firstIndex);
       if (walEdit.size() > 0) {
-        HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-            this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
-            currentNonceGroup, currentNonce);
-        txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit,
-          getSequenceId(), true);
-        hasWalAppends = true;
+        walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+            this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, 
+            mutation.getClusterIds(), currentNonceGroup, currentNonce);
+        txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
+          getSequenceId(), true, memstoreCells);
+      }
+      if(walKey == null){
+        // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
+        walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
       }
 
       // -------------------------------
@@ -2590,9 +2598,10 @@ public class HRegion implements HeapSize { // , Writable{
       // -------------------------
       // STEP 7. Sync wal.
       // -------------------------
-      if (hasWalAppends) {
+      if (txid != 0) {
         syncOrDefer(txid, durability);
       }
+      
       doRollBackMemstore = false;
       // calling the post CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
@@ -2606,7 +2615,7 @@ public class HRegion implements HeapSize { // , Writable{
       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
       // ------------------------------------------------------------------
       if (w != null) {
-        mvcc.completeMemstoreInsert(w);
+        mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
         w = null;
       }
 
@@ -2636,9 +2645,11 @@ public class HRegion implements HeapSize { // , Writable{
 
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
-        rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
+        rollbackMemstore(memstoreCells);
+      }
+      if (w != null) {
+        mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
       }
-      if (w != null) mvcc.completeMemstoreInsert(w);
 
       if (locked) {
         this.updatesLock.readLock().unlock();
@@ -2727,7 +2738,7 @@ public class HRegion implements HeapSize { // , Writable{
       // Lock row - note that doBatchMutate will relock this row if called
       RowLock rowLock = getRowLock(get.getRow());
       // wait for all previous transactions to complete (with lock held)
-      mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+      mvcc.waitForPreviousTransactionsComplete();
       try {
         if (this.getCoprocessorHost() != null) {
           Boolean processed = null;
@@ -2903,34 +2914,25 @@ public class HRegion implements HeapSize { // , Writable{
    * @param familyMap Map of kvs per family
    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
    *        If null, then this method internally creates a mvcc transaction.
+   * @param output newly added KVs into memstore
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    */
   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
+    long mvccNum, List<KeyValue> memstoreCells) {
     long size = 0;
-    boolean freemvcc = false;
 
-    try {
-      if (localizedWriteEntry == null) {
-        localizedWriteEntry = mvcc.beginMemstoreInsert();
-        freemvcc = true;
-      }
-
-      for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
-        byte[] family = e.getKey();
-        List<Cell> cells = e.getValue();
+    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
+      byte[] family = e.getKey();
+      List<Cell> cells = e.getValue();
 
-        Store store = getStore(family);
-        for (Cell cell: cells) {
-          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-          kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
-          size += store.add(kv);
-        }
-      }
-    } finally {
-      if (freemvcc) {
-        mvcc.completeMemstoreInsert(localizedWriteEntry);
+      Store store = getStore(family);
+      for (Cell cell: cells) {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+        kv.setMvccVersion(mvccNum);
+        Pair<Long, Cell> ret = store.add(kv);
+        size += ret.getFirst();
+        memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
       }
     }
 
@@ -2942,35 +2944,16 @@ public class HRegion implements HeapSize { // , Writable{
    * called when a Put/Delete has updated memstore but subsequently fails to update
    * the wal. This method is then invoked to rollback the memstore.
    */
-  private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
-                                Map<byte[], List<Cell>>[] familyMaps,
-                                int start, int end) {
+  private void rollbackMemstore(List<KeyValue> memstoreCells) {
     int kvsRolledback = 0;
-    for (int i = start; i < end; i++) {
-      // skip over request that never succeeded in the first place.
-      if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.SUCCESS) {
-        continue;
-      }
-
-      // Rollback all the kvs for this row.
-      Map<byte[], List<Cell>> familyMap  = familyMaps[i];
-      for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
-        byte[] family = e.getKey();
-        List<Cell> cells = e.getValue();
-
-        // Remove those keys from the memstore that matches our
-        // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
-        // that even the memstoreTS has to match for keys that will be rolled-back.
-        Store store = getStore(family);
-        for (Cell cell: cells) {
-          store.rollback(KeyValueUtil.ensureKeyValue(cell));
-          kvsRolledback++;
-        }
-      }
+    
+    for (KeyValue kv : memstoreCells) {
+      byte[] family = kv.getFamily();
+      Store store = getStore(family);
+      store.rollback(kv);
+      kvsRolledback++;
     }
-    LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
-        " keyvalues from start:" + start + " to end:" + end);
+    LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
   }
 
   /**
@@ -3378,7 +3361,7 @@ public class HRegion implements HeapSize { // , Writable{
    * @return True if we should flush.
    */
   protected boolean restoreEdit(final Store s, final KeyValue kv) {
-    long kvSize = s.add(kv);
+    long kvSize = s.add(kv).getFirst();
     if (this.rsAccounting != null) {
       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
     }
@@ -4883,7 +4866,10 @@ public class HRegion implements HeapSize { // , Writable{
     List<RowLock> acquiredRowLocks;
     long addedSize = 0;
     List<KeyValue> mutations = new ArrayList<KeyValue>();
+    List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
+    long mvccNum = 0;
+    HLogKey walKey = null;
     try {
       // 2. Acquire the row lock(s)
       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -4894,6 +4880,7 @@ public class HRegion implements HeapSize { // , Writable{
       // 3. Region lock
       lock(this.updatesLock.readLock(), acquiredRowLocks.size());
       locked = true;
+      mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
 
       long now = EnvironmentEdgeManager.currentTimeMillis();
       try {
@@ -4904,27 +4891,35 @@ public class HRegion implements HeapSize { // , Writable{
 
         if (!mutations.isEmpty()) {
           // 5. Get a mvcc write number
-          writeEntry = mvcc.beginMemstoreInsert();
+          writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           // 6. Apply to memstore
           for (KeyValue kv : mutations) {
-            kv.setMvccVersion(writeEntry.getWriteNumber());
+            kv.setMvccVersion(mvccNum);
             Store store = getStore(kv);
             if (store == null) {
               checkFamily(CellUtil.cloneFamily(kv));
               // unreachable
             }
-            addedSize += store.add(kv);
+            Pair<Long, Cell> ret = store.add(kv);
+            addedSize += ret.getFirst();
+            memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
           }
 
           long txid = 0;
           // 7. Append no sync
           if (!walEdit.isEmpty()) {
-            HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup,
-              nonce);
+            walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+              this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, 
+              processor.getClusterIds(), nonceGroup, nonce);
             txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
-              key, walEdit, getSequenceId(), true);
+              walKey, walEdit, getSequenceId(), true, memstoreCells);
           }
+          if(walKey == null){
+            // since we use log sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit
+            // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
+            walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+          }
+
           // 8. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
@@ -4951,7 +4946,7 @@ public class HRegion implements HeapSize { // , Writable{
         }
         // 11. Roll mvcc forward
         if (writeEntry != null) {
-          mvcc.completeMemstoreInsert(writeEntry);
+          mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
         }
         if (locked) {
           this.updatesLock.readLock().unlock();
@@ -5055,8 +5050,12 @@ public class HRegion implements HeapSize { // , Writable{
     // Lock row
     startRegionOperation(Operation.APPEND);
     this.writeRequestsCount.increment();
+    long mvccNum = 0;
     WriteEntry w = null;
-    RowLock rowLock;
+    HLogKey walKey = null;
+    RowLock rowLock = null;
+    List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
+    boolean doRollBackMemstore = false;
     try {
       rowLock = getRowLock(row);
       try {
@@ -5064,7 +5063,7 @@ public class HRegion implements HeapSize { // , Writable{
         try {
           // wait for all prior MVCC transactions to finish - while we hold the row lock
           // (so that we are guaranteed to see the latest state)
-          mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+          mvcc.waitForPreviousTransactionsComplete();
           if (this.coprocessorHost != null) {
             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
             if(r!= null) {
@@ -5072,7 +5071,8 @@ public class HRegion implements HeapSize { // , Writable{
             }
           }
           // now start my own transaction
-          w = mvcc.beginMemstoreInsert();
+          mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+          w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTimeMillis();
           // Process each family
           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
@@ -5140,7 +5140,7 @@ public class HRegion implements HeapSize { // , Writable{
                 // so only need to update the timestamp to 'now'
                 newKV.updateLatestStamp(Bytes.toBytes(now));
              }
-              newKV.setMvccVersion(w.getWriteNumber());
+              newKV.setMvccVersion(mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5161,34 +5161,43 @@ public class HRegion implements HeapSize { // , Writable{
             tempMemstore.put(store, kvs);
           }
 
-          // Actually write to WAL now
-          if (writeToWAL) {
-            // Using default cluster id, as this can only happen in the originating
-            // cluster. A slave cluster receives the final value (not the delta)
-            // as a Put.
-            HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
-              this.htableDescriptor.getTableName(), nonceGroup, nonce);
-            txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits,
-              this.sequenceId, true);
-          } else {
-            recordMutationWithoutWal(append.getFamilyCellMap());
-          }
-
           //Actually write to Memstore now
           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
             Store store = entry.getKey();
             if (store.getFamily().getMaxVersions() == 1) {
               // upsert if VERSIONS for this CF == 1
               size += store.upsert(entry.getValue(), getSmallestReadPoint());
+              memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
             } else {
               // otherwise keep older versions around
               for (Cell cell: entry.getValue()) {
                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                size += store.add(kv);
+                Pair<Long, Cell> ret = store.add(kv);
+                size += ret.getFirst();
+                memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
+                doRollBackMemstore = true;
               }
             }
             allKVs.addAll(entry.getValue());
           }
+          
+          // Actually write to WAL now
+          if (writeToWAL) {
+            // Using default cluster id, as this can only happen in the originating
+            // cluster. A slave cluster receives the final value (not the delta)
+            // as a Put.
+            walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
+              this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
+            txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
+              this.sequenceId, true, memstoreCells);
+          } else {
+            recordMutationWithoutWal(append.getFamilyCellMap());
+          }
+          if(walKey == null){
+            // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
+            walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+          }
+          
           size = this.addAndGetGlobalMemstoreSize(size);
           flush = isFlushSize(size);
         } finally {
@@ -5196,14 +5205,23 @@ public class HRegion implements HeapSize { // , Writable{
         }
       } finally {
         rowLock.release();
+        rowLock = null;
       }
-      if (writeToWAL) {
-        // sync the transaction log outside the rowlock
+      // sync the transaction log outside the rowlock
+      if(txid != 0){
         syncOrDefer(txid, durability);
       }
+      doRollBackMemstore = false;
     } finally {
+      if (rowLock != null) {
+        rowLock.release();
+      }
+      // if the wal sync was unsuccessful, remove keys from memstore
+      if (doRollBackMemstore) {
+        rollbackMemstore(memstoreCells);
+      }
       if (w != null) {
-        mvcc.completeMemstoreInsert(w);
+        mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
       }
       closeRegionOperation(Operation.APPEND);
     }
@@ -5250,15 +5268,20 @@ public class HRegion implements HeapSize { // , Writable{
     // Lock row
     startRegionOperation(Operation.INCREMENT);
     this.writeRequestsCount.increment();
+    RowLock rowLock = null;
     WriteEntry w = null;
+    HLogKey walKey = null;
+    long mvccNum = 0;
+    List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
+    boolean doRollBackMemstore = false;
     try {
-      RowLock rowLock = getRowLock(row);
+      rowLock = getRowLock(row);
       try {
         lock(this.updatesLock.readLock());
         try {
           // wait for all prior MVCC transactions to finish - while we hold the row lock
           // (so that we are guaranteed to see the latest state)
-          mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
+          mvcc.waitForPreviousTransactionsComplete();
           if (this.coprocessorHost != null) {
             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
             if (r != null) {
@@ -5266,7 +5289,8 @@ public class HRegion implements HeapSize { // , Writable{
             }
           }
           // now start my own transaction
-          w = mvcc.beginMemstoreInsert();
+          mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+          w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
           long now = EnvironmentEdgeManager.currentTimeMillis();
           // Process each family
           for (Map.Entry<byte [], List<Cell>> family:
@@ -5330,7 +5354,7 @@ public class HRegion implements HeapSize { // , Writable{
                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
               }
-              newKV.setMvccVersion(w.getWriteNumber());
+              newKV.setMvccVersion(mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5357,20 +5381,6 @@ public class HRegion implements HeapSize { // , Writable{
             }
           }
 
-          // Actually write to WAL now
-          if (walEdits != null && !walEdits.isEmpty()) {
-            if (writeToWAL) {
-              // Using default cluster id, as this can only happen in the originating
-              // cluster. A slave cluster receives the final value (not the delta)
-              // as a Put.
-              HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-                this.htableDescriptor.getTableName(), nonceGroup, nonce);
-              txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
-                  key, walEdits, getSequenceId(), true);
-            } else {
-              recordMutationWithoutWal(increment.getFamilyCellMap());
-            }
-          }
           //Actually write to Memstore now
           if (!tempMemstore.isEmpty()) {
             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
@@ -5378,30 +5388,62 @@ public class HRegion implements HeapSize { // , Writable{
               if (store.getFamily().getMaxVersions() == 1) {
                 // upsert if VERSIONS for this CF == 1
                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
+                memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
               } else {
                 // otherwise keep older versions around
                 for (Cell cell : entry.getValue()) {
                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-                  size += store.add(kv);
+                  Pair<Long, Cell> ret = store.add(kv);
+                  size += ret.getFirst();
+                  memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
+                  doRollBackMemstore = true;
                 }
               }
             }
             size = this.addAndGetGlobalMemstoreSize(size);
             flush = isFlushSize(size);
           }
+          
+          // Actually write to WAL now
+          if (walEdits != null && !walEdits.isEmpty()) {
+            if (writeToWAL) {
+              // Using default cluster id, as this can only happen in the originating
+              // cluster. A slave cluster receives the final value (not the delta)
+              // as a Put.
+              walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+                this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce);
+              txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
+                walKey, walEdits, getSequenceId(), true, memstoreCells);
+            } else {
+              recordMutationWithoutWal(increment.getFamilyCellMap());
+            }
+          }
+          if(walKey == null){
+            // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+            walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
+          }
         } finally {
           this.updatesLock.readLock().unlock();
         }
       } finally {
         rowLock.release();
+        rowLock = null;
       }
-      if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
-        // sync the transaction log outside the rowlock
+      // sync the transaction log outside the rowlock
+      if(txid != 0){
         syncOrDefer(txid, durability);
       }
+      doRollBackMemstore = false;
     } finally {
+      if (rowLock != null) {
+        rowLock.release();
+      }
+      // if the wal sync was unsuccessful, remove keys from memstore
+      if (doRollBackMemstore) {
+        rollbackMemstore(memstoreCells);
+      }
       if (w != null) {
-        mvcc.completeMemstoreInsert(w);
+        mvcc.completeMemstoreInsertWithSeqNum(w, walKey);
       }
       closeRegionOperation(Operation.INCREMENT);
       if (this.metricsRegion != null) {
@@ -6130,4 +6172,23 @@ public class HRegion implements HeapSize { // , Writable{
       }
     }
   }
+  
+  /**
+   * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
+   * the WALEdit append later.
+   * @param wal
+   * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
+   *        be updated with right mvcc values(their log sequence nu
+   * @return
+   * @throws IOException
+   */
+  private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
+    HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
+      HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
+    // Call append but with an empty WALEdit.  The returned seqeunce id will not be associated
+    // with any edit and we can be sure it went in after all outstanding appends.
+    wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
+      WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
+    return key;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6de94c8..2218244 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -564,10 +565,10 @@ public class HStore implements Store {
   }
 
   @Override
-  public long add(final KeyValue kv) {
+  public Pair<Long, Cell> add(final KeyValue kv) {
     lock.readLock().lock();
     try {
-      return this.memstore.add(kv);
+       return this.memstore.add(kv);
     } finally {
       lock.readLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index b370b49..ac2155a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
@@ -61,9 +62,10 @@ public interface MemStore extends HeapSize {
   /**
    * Write an update
    * @param cell
-   * @return approximate size of the passed key and value.
+   * @return approximate size of the passed KV and the newly added KV which maybe different from the
+   *         passed in KV.
    */
-  long add(final Cell cell);
+  Pair<Long, Cell> add(final Cell cell);
 
   /**
    * @return Oldest timestamp of all the Cells in the MemStore

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index b46d55b..4343313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -32,20 +34,18 @@ import org.apache.hadoop.hbase.util.ClassSize;
  */
 @InterfaceAudience.Private
 public class MultiVersionConsistencyControl {
-  private volatile long memstoreRead = 0;
-  private volatile long memstoreWrite = 0;
-
+  private static final long NO_WRITE_NUMBER = 0;
+  private volatile long memstoreRead = 0; 
   private final Object readWaiters = new Object();
 
   // This is the pending queue of writes.
   private final LinkedList<WriteEntry> writeQueue =
       new LinkedList<WriteEntry>();
-
+  
   /**
    * Default constructor. Initializes the memstoreRead/Write points to 0.
    */
   public MultiVersionConsistencyControl() {
-    this.memstoreRead = this.memstoreWrite = 0;
   }
 
   /**
@@ -54,37 +54,86 @@ public class MultiVersionConsistencyControl {
    */
   public void initialize(long startPoint) {
     synchronized (writeQueue) {
-      if (this.memstoreWrite != this.memstoreRead) {
-        throw new RuntimeException("Already used this mvcc. Too late to initialize");
-      }
-
-      this.memstoreRead = this.memstoreWrite = startPoint;
+      writeQueue.clear();
+      memstoreRead = startPoint;
     }
   }
 
   /**
-   * Generate and return a {@link WriteEntry} with a new write number.
-   * To complete the WriteEntry and wait for it to be visible,
-   * call {@link #completeMemstoreInsert(WriteEntry)}.
+   * 
+   * @param initVal The value we used initially and expected it'll be reset later
+   * @return
    */
-  public WriteEntry beginMemstoreInsert() {
+  WriteEntry beginMemstoreInsert() {
+    return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
+  }
+  
+  /**
+   * Get a mvcc write number before an actual one(its log sequence Id) being assigned
+   * @param sequenceId
+   * @return long a faked write number which is bigger enough not to be seen by others before a real
+   *         one is assigned
+   */
+  public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
+    // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
+    // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
+    // because each handler could increment sequence num twice and max concurrent in-flight
+    // transactions is the number of RPC handlers.
+    // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
+    // changes touch same row key
+    // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
+    // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
+    return sequenceId.incrementAndGet() + 1000000000;  
+  }
+  
+  /**
+   * This function starts a MVCC transaction with current region's log change sequence number. Since
+   * we set change sequence number when flushing current change to WAL(late binding), the flush
+   * order may differ from the order to start a MVCC transaction. For example, a change begins a
+   * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
+   * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
+   * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
+   * big number is safe because we only need it to prevent current change being seen and the number
+   * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
+   * for MVCC to align with flush sequence.
+   * @param curSeqNum
+   * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
+   */
+  public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
+    WriteEntry e = new WriteEntry(curSeqNum);
     synchronized (writeQueue) {
-      long nextWriteNumber = ++memstoreWrite;
-      WriteEntry e = new WriteEntry(nextWriteNumber);
       writeQueue.add(e);
       return e;
     }
   }
 
   /**
-   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
-   *
-   * At the end of this call, the global read point is at least as large as the write point
-   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
+   * Complete a {@link WriteEntry} that was created by
+   * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
+   * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
+   * visible to MVCC readers.
+   * @throws IOException
+   */
+  public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum)
+      throws IOException {
+    if(e == null) return;
+    if (seqNum != null) {
+      e.setWriteNumber(seqNum.getSequenceNumber());
+    } else {
+      // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
+      // function beginMemstoreInsertWithSeqNum in case of failures
+      e.setWriteNumber(NO_WRITE_NUMBER);
+    }
+    waitForPreviousTransactionsComplete(e);
+  }
+  
+  /**
+   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
+   * end of this call, the global read point is at least as large as the write point of the passed
+   * in WriteEntry. Thus, the write is visible to MVCC readers.
    */
   public void completeMemstoreInsert(WriteEntry e) {
-    advanceMemstore(e);
-    waitForRead(e);
+    waitForPreviousTransactionsComplete(e);
   }
 
   /**
@@ -99,75 +148,94 @@ public class MultiVersionConsistencyControl {
    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
    */
   boolean advanceMemstore(WriteEntry e) {
+    long nextReadValue = -1;
     synchronized (writeQueue) {
       e.markCompleted();
 
-      long nextReadValue = -1;
-      boolean ranOnce=false;
       while (!writeQueue.isEmpty()) {
-        ranOnce=true;
         WriteEntry queueFirst = writeQueue.getFirst();
-
-        if (nextReadValue > 0) {
-          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
-            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
-                + nextReadValue + " next: " + queueFirst.getWriteNumber());
-          }
-        }
-
         if (queueFirst.isCompleted()) {
-          nextReadValue = queueFirst.getWriteNumber();
+          // Using Max because Edit complete in WAL sync order not arriving order
+          nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
           writeQueue.removeFirst();
         } else {
           break;
         }
       }
 
-      if (!ranOnce) {
-        throw new RuntimeException("never was a first");
+      if (nextReadValue > memstoreRead) {
+        memstoreRead = nextReadValue;
       }
 
-      if (nextReadValue > 0) {
-        synchronized (readWaiters) {
-          memstoreRead = nextReadValue;
-          readWaiters.notifyAll();
-        }
-      }
-      if (memstoreRead >= e.getWriteNumber()) {
-        return true;
+      // notify waiters on writeQueue before return
+      writeQueue.notifyAll();
+    }
+
+    if (nextReadValue > 0) {
+      synchronized (readWaiters) {
+        readWaiters.notifyAll();
       }
-      return false;
     }
+
+    if (memstoreRead >= e.getWriteNumber()) {
+      return true;
+    }
+    return false;
   }
 
   /**
-   * Wait for the global readPoint to advance upto
-   * the specified transaction number.
+   * Wait for all previous MVCC transactions complete
    */
-  public void waitForRead(WriteEntry e) {
+  public void waitForPreviousTransactionsComplete() {
+    WriteEntry w = beginMemstoreInsert();
+    waitForPreviousTransactionsComplete(w);
+  }
+  
+  public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
     boolean interrupted = false;
-    synchronized (readWaiters) {
-      while (memstoreRead < e.getWriteNumber()) {
-        try {
-          readWaiters.wait(0);
-        } catch (InterruptedException ie) {
-          // We were interrupted... finish the loop -- i.e. cleanup --and then
-          // on our way out, reset the interrupt flag.
-          interrupted = true;
+    WriteEntry w = waitedEntry;
+
+    try {
+      WriteEntry firstEntry = null;
+      do {
+        synchronized (writeQueue) {
+          // writeQueue won't be empty at this point, the following is just a safety check
+          if (writeQueue.isEmpty()) {
+            break;
+          }
+          firstEntry = writeQueue.getFirst();
+          if (firstEntry == w) {
+            // all previous in-flight transactions are done
+            break;
+          }
+          try {
+            writeQueue.wait(0);
+          } catch (InterruptedException ie) {
+            // We were interrupted... finish the loop -- i.e. cleanup --and then
+            // on our way out, reset the interrupt flag.
+            interrupted = true;
+            break;
+          }
         }
+      } while (firstEntry != null);
+    } finally {
+      if (w != null) {
+        advanceMemstore(w);
       }
     }
-    if (interrupted) Thread.currentThread().interrupt();
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
   }
 
   public long memstoreReadPoint() {
     return memstoreRead;
   }
 
-
   public static class WriteEntry {
     private long writeNumber;
     private boolean completed = false;
+
     WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
@@ -180,6 +248,9 @@ public class MultiVersionConsistencyControl {
     long getWriteNumber() {
       return this.writeNumber;
     }
+    void setWriteNumber(long val){
+      this.writeNumber = val;
+    }
   }
 
   public static final long FIXED_SIZE = ClassSize.align(

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
new file mode 100644
index 0000000..90b1029
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Interface which abstracts implementations on log sequence number assignment
+ */
+@InterfaceAudience.Private
+public interface SequenceNumber {
+  public long getSequenceNumber() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 8923769..fd73f2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
@@ -122,9 +123,9 @@ public interface Store extends HeapSize, StoreConfigInformation {
   /**
    * Adds a value to the memstore
    * @param kv
-   * @return memstore size delta
+   * @return memstore size delta & newly added KV which maybe different than the passed in KV
    */
-  long add(KeyValue kv);
+  Pair<Long, Cell> add(KeyValue kv);
 
   /**
    * When was the last edit done in the memstore

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index b876972..7403700 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -121,12 +121,6 @@ abstract class StoreFlusher {
           // set its memstoreTS to 0. This will help us save space when writing to
           // disk.
           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
-          if (kv.getMvccVersion() <= smallestReadPoint) {
-            // let us not change the original KV. It could be in the memstore
-            // changing its memstoreTS could affect other threads/scanners.
-            kv = kv.shallowCopy();
-            kv.setMvccVersion(0);
-          }
           sink.append(kv);
         }
         kvs.clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 268e302..c0c7dbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1064,13 +1064,26 @@ class FSHLog implements HLog, Syncable {
     }
   }
 
+  /**
+   * @param now
+   * @param encodedRegionName Encoded name of the region as returned by
+   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tableName
+   * @param clusterIds that have consumed the change
+   * @return New log key.
+   */
+  protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
+      long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
+    return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
+  }
+  
   @Override
   @VisibleForTesting
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
     final long now, HTableDescriptor htd, AtomicLong sequenceId)
   throws IOException {
     HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
-    append(htd, info, logKey, edits, sequenceId, true, true);
+    append(htd, info, logKey, edits, sequenceId, true, true, null);
   }
 
   @Override
@@ -1079,14 +1092,15 @@ class FSHLog implements HLog, Syncable {
       boolean inMemstore, long nonceGroup, long nonce) throws IOException {
     HLogKey logKey =
       new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
-    return append(htd, info, logKey, edits, sequenceId, false, inMemstore);
+    return append(htd, info, logKey, edits, sequenceId, false, inMemstore, null);
   }
 
   @Override
   public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
-      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore)
+      final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, 
+      final List<KeyValue> memstoreKVs)
   throws IOException {
-    return append(htd, info, key, edits, sequenceId, false, inMemstore);
+    return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs);
   }
 
   /**
@@ -1101,19 +1115,22 @@ class FSHLog implements HLog, Syncable {
    * @param sync shall we sync after we call the append?
    * @param inMemstore
    * @param sequenceId The region sequence id reference.
+   * @param memstoreKVs
    * @return txid of this transaction or if nothing to do, the last txid
    * @throws IOException
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
       justification="Will never be null")
   private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
-      WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore)
+      WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore, 
+      List<KeyValue> memstoreKVs)
   throws IOException {
     if (!this.enabled) return this.highestUnsyncedSequence;
     if (this.closed) throw new IOException("Cannot append; log is closed");
     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
     // single consuming thread.  Don't have to worry about it.
     TraceScope scope = Trace.startSpan("FSHLog.append");
+
     // This is crazy how much it takes to make an edit.  Do we need all this stuff!!!!????  We need
     // all this to make a key and then below to append the edit, we need to carry htd, info,
     // etc. all over the ring buffer.
@@ -1124,19 +1141,10 @@ class FSHLog implements HLog, Syncable {
       // Construction of FSWALEntry sets a latch.  The latch is thrown just after we stamp the
       // edit with its edit/sequence id.  The below entry.getRegionSequenceId will wait on the
       // latch to be thrown.  TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
-      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri);
+      entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs);
       truck.loadPayload(entry, scope.detach());
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
-      // Now wait until the region edit/sequence id is available.  The 'entry' has an internal
-      // latch that is thrown when the region edit/sequence id is set.  Calling
-      // entry.getRegionSequenceId will cause us block until the latch is thrown.  The return is
-      // the region edit/sequence id, not the ring buffer txid.
-      try {
-        entry.getRegionSequenceId();
-      } catch (InterruptedException e) {
-        throw convertInterruptedExceptionToIOException(e);
-      }
     }
     // doSync is set in tests.  Usually we arrive in here via appendNoSync w/ the sync called after
     // all edits on a handler have been added.
@@ -1894,6 +1902,14 @@ class FSHLog implements HLog, Syncable {
         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
         // so region sequenceids will also be in order.
         regionSequenceId = entry.stampRegionSequenceId();
+        
+        // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a 
+        // region sequence id only, a region edit/sequence id that is not associated with an actual 
+        // edit. It has to go through all the rigmarole to be sure we have the right ordering.
+        if (entry.getEdit().isEmpty()) {
+          return;
+        }
+        
         // Coprocessor hook.
         if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
             entry.getEdit())) {
@@ -1909,19 +1925,16 @@ class FSHLog implements HLog, Syncable {
               entry.getEdit());
           }
         }
-        // If empty, there is nothing to append.  Maybe empty when we are looking for a region
-        // sequence id only, a region edit/sequence id that is not associated with an actual edit.
-        // It has to go through all the rigmarole to be sure we have the right ordering.
-        if (!entry.getEdit().isEmpty()) {
-          writer.append(entry);
-          assert highestUnsyncedSequence < entry.getSequence();
-          highestUnsyncedSequence = entry.getSequence();
-          Long lRegionSequenceId = Long.valueOf(regionSequenceId);
-          highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
-          if (entry.isInMemstore()) {
-            oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
-          }
+
+        writer.append(entry);
+        assert highestUnsyncedSequence < entry.getSequence();
+        highestUnsyncedSequence = entry.getSequence();
+        Long lRegionSequenceId = Long.valueOf(regionSequenceId);
+        highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
+        if (entry.isInMemstore()) {
+          oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
         }
+        
         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 9799269..1e9472a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 
 /**
  * A WAL Entry for {@link FSHLog} implementation.  Immutable.
@@ -41,19 +43,18 @@ class FSWALEntry extends HLog.Entry {
   private final transient boolean inMemstore;
   private final transient HTableDescriptor htd;
   private final transient HRegionInfo hri;
-  // Latch that is set on creation and then is undone on the other side of the ring buffer by the
-  // consumer thread just after it sets the region edit/sequence id in here.
-  private final transient CountDownLatch latch = new CountDownLatch(1);
+  private final transient List<KeyValue> memstoreKVs;
 
   FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
       final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
-      final HTableDescriptor htd, final HRegionInfo hri) {
+      final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) {
     super(key, edit);
     this.regionSequenceIdReference = referenceToRegionSequenceId;
     this.inMemstore = inMemstore;
     this.htd = htd;
     this.hri = hri;
     this.sequence = sequence;
+    this.memstoreKVs = memstoreKVs;
   }
 
   public String toString() {
@@ -90,15 +91,13 @@ class FSWALEntry extends HLog.Entry {
    */
   long stampRegionSequenceId() {
     long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    getKey().setLogSeqNum(regionSequenceId);
-    // On creation, a latch was set.  Count it down when sequence id is set.  This will free
-    // up anyone blocked on {@link #getRegionSequenceId()}
-    this.latch.countDown();
+    if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
+      for(KeyValue kv : this.memstoreKVs){
+        kv.setMvccVersion(regionSequenceId);
+      }
+    }
+    HLogKey key = getKey();
+    key.setLogSeqNum(regionSequenceId);
     return regionSequenceId;
   }
-
-  long getRegionSequenceId() throws InterruptedException {
-    this.latch.await();
-    return getKey().getLogSeqNum();
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index ca6f444..99a9a5c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
 import org.apache.hadoop.io.Writable;
@@ -290,8 +292,8 @@ public interface HLog {
    * @param sequenceId
    * @throws IOException
    * @deprecated For tests only and even then, should use
-   * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
-   * and {@link #sync()} instead.
+   * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, 
+   * List)} and {@link #sync()} instead.
    */
   @VisibleForTesting
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@@ -337,7 +339,7 @@ public interface HLog {
    * able to sync an explicit edit only (the current default implementation syncs up to the time
    * of the sync call syncing whatever is behind the sync).
    * @throws IOException
-   * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
+   * @deprecated Use {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, List)}
    * instead because you can get back the region edit/sequenceid; it is set into the passed in
    * <code>key</code>.
    */
@@ -361,12 +363,13 @@ public interface HLog {
    * @param inMemstore Always true except for case where we are writing a compaction completion
    * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
    * -- it is not an edit for memstore.
+   * @param memstoreKVs list of KVs added into memstore
    * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
    * in it.
    * @throws IOException
    */
   long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
-      AtomicLong sequenceId, boolean inMemstore)
+      AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs)
   throws IOException;
 
   // TODO: Do we need all these versions of sync?

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index f591f4e..ad1c001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +32,10 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.SequenceNumber;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.io.WritableComparable;
@@ -49,7 +55,6 @@ import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.HBaseZeroCopyByteString;
 
 /**
  * A Key for an entry in the change log.
@@ -64,7 +69,7 @@ import com.google.protobuf.HBaseZeroCopyByteString;
 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
 //       purposes. They need to be merged into HLogEntry.
 @InterfaceAudience.Private
-public class HLogKey implements WritableComparable<HLogKey> {
+public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
   public static final Log LOG = LogFactory.getLog(HLogKey.class);
 
   // should be < 0 (@see #readFields(DataInput))
@@ -114,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
   private byte [] encodedRegionName;
   private TableName tablename;
   private long logSeqNum;
+  private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
   // Time at which this edit was written.
   private long writeTime;
 
@@ -184,7 +190,8 @@ public class HLogKey implements WritableComparable<HLogKey> {
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
       final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce);
+    init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, 
+      nonceGroup, nonce);
   }
 
   /**
@@ -195,13 +202,14 @@ public class HLogKey implements WritableComparable<HLogKey> {
    * @param encodedRegionName Encoded name of the region as returned by
    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tablename
+   * @param logSeqNum
    * @param nonceGroup
    * @param nonce
    */
-  public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup,
-      long nonce) {
-    init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID,
-        EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce);
+  public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
+      long nonceGroup, long nonce) {
+    init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(), 
+      EMPTY_UUIDS, nonceGroup, nonce);
   }
 
   protected void init(final byte [] encodedRegionName, final TableName tablename,
@@ -238,11 +246,30 @@ public class HLogKey implements WritableComparable<HLogKey> {
   }
 
   /**
-   * Allow that the log sequence id to be set post-construction.
+   * Allow that the log sequence id to be set post-construction and release all waiters on assigned
+   * sequence number.
    * @param sequence
    */
   void setLogSeqNum(final long sequence) {
     this.logSeqNum = sequence;
+    this.seqNumAssignedLatch.countDown();
+  }
+  
+  /**
+   * Wait for sequence number is assigned & return the assigned value
+   * @return long the new assigned sequence number
+   * @throws InterruptedException
+   */
+  public long getSequenceNumber() throws IOException {
+    try {
+      this.seqNumAssignedLatch.await();
+    } catch (InterruptedException ie) {
+      LOG.warn("Thread interrupted waiting for next log sequence number");
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      throw iie;
+    }
+    return this.logSeqNum;
   }
 
   /**
@@ -358,7 +385,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
     if (result == 0) {
       if (this.logSeqNum < o.logSeqNum) {
         result = -1;
-      } else if (this.logSeqNum  > o.logSeqNum ) {
+      } else if (this.logSeqNum  > o.logSeqNum) {
         result = 1;
       }
       if (result == 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 00e9f15..b9f82b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -1972,8 +1972,8 @@ public class HLogSplitter {
         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
       }
       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
-              .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
-              clusterIds, walKey.getNonceGroup(), walKey.getNonce());
+              .getTableName().toByteArray()), walKey.getLogSequenceNumber(), 
+              walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
       logEntry.setFirst(key);
       logEntry.setSecond(val);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index fcb5610..6809aad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -262,7 +262,7 @@ public class HLogUtil {
       final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
     TableName tn = TableName.valueOf(c.getTableName().toByteArray());
     HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false);
+    log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
     log.sync();
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index b10c4a9..8ecb4b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -249,7 +249,7 @@ public class WALEdit implements Writable, HeapSize {
     sb.append(">]");
     return sb.toString();
   }
-
+  
   /**
    * Create a compacion WALEdit
    * @param c

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 98563d6..2e74281 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -68,7 +68,7 @@ public class TestMultiParallel {
   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
   private static final byte [][] KEYS = makeKeys();
 
-  private static final int slaves = 2; // also used for testing HTable pool size
+  private static final int slaves = 3; // also used for testing HTable pool size
 
   @BeforeClass public static void beforeClass() throws Exception {
     ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -238,7 +238,7 @@ public class TestMultiParallel {
    *
    * @throws Exception
    */
-  @Test (timeout=300000)
+  @Test (timeout=360000)
   public void testFlushCommitsWithAbort() throws Exception {
     LOG.info("test=testFlushCommitsWithAbort");
     doTestFlushCommits(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index f2db498..ebe95b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
@@ -61,6 +62,7 @@ public class TestDefaultMemStore extends TestCase {
   private static final int QUALIFIER_COUNT = ROW_COUNT;
   private static final byte [] FAMILY = Bytes.toBytes("column");
   private MultiVersionConsistencyControl mvcc;
+  private AtomicLong startSeqNum = new AtomicLong(0); 
 
   @Override
   public void setUp() throws Exception {
@@ -236,7 +238,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] v = Bytes.toBytes("value");
 
     MultiVersionConsistencyControl.WriteEntry w =
-        mvcc.beginMemstoreInsert();
+        mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv1 = new KeyValue(row, f, q1, v);
     kv1.setMvccVersion(w.getWriteNumber());
@@ -250,7 +252,7 @@ public class TestDefaultMemStore extends TestCase {
     s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
     assertScannerResults(s, new KeyValue[]{kv1});
 
-    w = mvcc.beginMemstoreInsert();
+    w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kv2 = new KeyValue(row, f, q2, v);
     kv2.setMvccVersion(w.getWriteNumber());
     memstore.add(kv2);
@@ -280,7 +282,7 @@ public class TestDefaultMemStore extends TestCase {
 
     // INSERT 1: Write both columns val1
     MultiVersionConsistencyControl.WriteEntry w =
-        mvcc.beginMemstoreInsert();
+        mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
     kv11.setMvccVersion(w.getWriteNumber());
@@ -296,7 +298,7 @@ public class TestDefaultMemStore extends TestCase {
     assertScannerResults(s, new KeyValue[]{kv11, kv12});
 
     // START INSERT 2: Write both columns val2
-    w = mvcc.beginMemstoreInsert();
+    w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kv21 = new KeyValue(row, f, q1, v2);
     kv21.setMvccVersion(w.getWriteNumber());
     memstore.add(kv21);
@@ -332,7 +334,7 @@ public class TestDefaultMemStore extends TestCase {
     final byte[] v1 = Bytes.toBytes("value1");
     // INSERT 1: Write both columns val1
     MultiVersionConsistencyControl.WriteEntry w =
-        mvcc.beginMemstoreInsert();
+        mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
     KeyValue kv11 = new KeyValue(row, f, q1, v1);
     kv11.setMvccVersion(w.getWriteNumber());
@@ -348,7 +350,7 @@ public class TestDefaultMemStore extends TestCase {
     assertScannerResults(s, new KeyValue[]{kv11, kv12});
 
     // START DELETE: Insert delete for one of the columns
-    w = mvcc.beginMemstoreInsert();
+    w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
     KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
         KeyValue.Type.DeleteColumn);
     kvDel.setMvccVersion(w.getWriteNumber());
@@ -377,6 +379,7 @@ public class TestDefaultMemStore extends TestCase {
 
     final MultiVersionConsistencyControl mvcc;
     final MemStore memstore;
+    final AtomicLong startSeqNum;
 
     AtomicReference<Throwable> caughtException;
 
@@ -384,12 +387,14 @@ public class TestDefaultMemStore extends TestCase {
     public ReadOwnWritesTester(int id,
                                MemStore memstore,
                                MultiVersionConsistencyControl mvcc,
-                               AtomicReference<Throwable> caughtException)
+                               AtomicReference<Throwable> caughtException,
+                               AtomicLong startSeqNum)
     {
       this.mvcc = mvcc;
       this.memstore = memstore;
       this.caughtException = caughtException;
       row = Bytes.toBytes(id);
+      this.startSeqNum = startSeqNum;
     }
 
     public void run() {
@@ -403,7 +408,7 @@ public class TestDefaultMemStore extends TestCase {
     private void internalRun() throws IOException {
       for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
         MultiVersionConsistencyControl.WriteEntry w =
-          mvcc.beginMemstoreInsert();
+            mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
 
         // Insert the sequence value (i)
         byte[] v = Bytes.toBytes(i);
@@ -433,7 +438,7 @@ public class TestDefaultMemStore extends TestCase {
     AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
 
     for (int i = 0; i < NUM_THREADS; i++) {
-      threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught);
+      threads[i] = new ReadOwnWritesTester(i, memstore, mvcc, caught, this.startSeqNum);
       threads[i].start();
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c682d57e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b2b4845..0f55b62 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -4152,15 +4152,16 @@ public class TestHRegion {
     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
 
     // expect skip wal cases
-    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
-    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
-    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
-    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
-    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
-    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
+    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false);
 
   }
 
+  @SuppressWarnings("unchecked")
   private void durabilityTest(String method, Durability tableDurability,
       Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
       final boolean expectSyncFromLogSyncer) throws Exception {
@@ -4183,7 +4184,7 @@ public class TestHRegion {
     //verify append called or not
     verify(log, expectAppend ? times(1) : never())
       .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
-          (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean());
+          (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any());
 
     // verify sync called or not
     if (expectSync || expectSyncFromLogSyncer) {
@@ -4202,7 +4203,7 @@ public class TestHRegion {
         }
       });
     } else {
-      verify(log, never()).sync(anyLong());
+      //verify(log, never()).sync(anyLong());
       verify(log, never()).sync();
     }