You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/09/29 00:44:53 UTC

hbase git commit: HBASE-21067 Backport HBASE-17519 (Rollback the removed cells)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 f91a91247 -> 171f8f066


HBASE-21067 Backport HBASE-17519 (Rollback the removed cells)

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/171f8f06
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/171f8f06
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/171f8f06

Branch: refs/heads/branch-1.3
Commit: 171f8f066ec072475ae4454e9b3f5d545cee73a3
Parents: f91a912
Author: Nihal Jain <ni...@gmail.com>
Authored: Sun Aug 26 21:40:37 2018 +0530
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Sep 28 17:08:03 2018 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/DefaultMemStore.java     |  13 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  97 +++--
 .../hadoop/hbase/regionserver/HStore.java       |   5 +-
 .../hadoop/hbase/regionserver/MemStore.java     |   3 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   3 +-
 .../hbase/client/TestRollbackFromClient.java    | 357 +++++++++++++++++++
 .../hbase/regionserver/TestDefaultMemStore.java |   6 +-
 7 files changed, 438 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 31922ae..5f17d31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -494,7 +494,7 @@ public class DefaultMemStore implements MemStore {
     // 'now' and a 0 memstoreTS == immediately visible
     List<Cell> cells = new ArrayList<Cell>(1);
     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
+    return upsert(cells, 1L, null);
   }
 
   /**
@@ -513,13 +513,14 @@ public class DefaultMemStore implements MemStore {
    *
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @param removedCells collect the removed cells. It can be null.
    * @return change in memstore size
    */
   @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) {
+  public long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) {
     long size = 0;
     for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
+      size += upsert(cell, readpoint, removedCells);
     }
     return size;
   }
@@ -537,8 +538,9 @@ public class DefaultMemStore implements MemStore {
    *
    * @param cell
    * @return change in size of MemStore
+   * @param removedCells collect the removed cells
    */
-  private long upsert(Cell cell, long readpoint) {
+  private long upsert(Cell cell, long readpoint, List<Cell> removedCells) {
     // Add the Cell to the MemStore
     // Use the internalAdd method here since we (a) already have a lock
     // and (b) cannot safely use the MSLAB here without potentially
@@ -577,6 +579,9 @@ public class DefaultMemStore implements MemStore {
             long delta = heapSizeChange(cur, true);
             addedSize -= delta;
             activeSection.getHeapSize().addAndGet(-delta);
+            if (removedCells != null) {
+              removedCells.add(cur);
+            }
             it.remove();
             setOldestEditTimeToNow();
           } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 26a5958..193d874 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3927,11 +3927,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * the wal. This method is then invoked to rollback the memstore.
    */
   private void rollbackMemstore(List<Cell> memstoreCells) {
-    int kvsRolledback = 0;
+    rollbackMemstore(null, memstoreCells);
+  }
 
+  private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
+    int kvsRolledback = 0;
     for (Cell cell : memstoreCells) {
-      byte[] family = CellUtil.cloneFamily(cell);
-      Store store = getStore(family);
+      Store store = defaultStore;
+      if (store == null) {
+        byte[] family = CellUtil.cloneFamily(cell);
+        store = getStore(family);
+      }
       store.rollback(cell);
       kvsRolledback++;
     }
@@ -7535,7 +7541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     byte[] row = mutate.getRow();
     checkRow(row, op.toString());
     checkFamilies(mutate.getFamilyCellMap().keySet());
-    boolean flush = false;
+    Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
     Durability durability = getEffectiveDurability(mutate.getDurability());
     boolean writeToWAL = durability != Durability.SKIP_WAL;
     WALEdit walEdits = null;
@@ -7660,30 +7666,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           // Actually write to Memstore now
-          if (!tempMemstore.isEmpty()) {
-            for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
-              Store store = entry.getKey();
-              if (store.getFamily().getMaxVersions() == 1) {
-                // upsert if VERSIONS for this CF == 1
-                // Is this right? It immediately becomes visible? St.Ack 20150907
-                size += store.upsert(entry.getValue(), getSmallestReadPoint());
-              } else {
-                // otherwise keep older versions around
-                for (Cell cell: entry.getValue()) {
-                  // This stamping of sequenceid seems redundant; it is happening down in
-                  // FSHLog when we consume edits off the ring buffer.
-                  CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
-                  size += store.add(cell);
-                  doRollBackMemstore = true;
-                }
+          doRollBackMemstore = !tempMemstore.isEmpty();
+          for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
+            Store store = entry.getKey();
+            if (store.getFamily().getMaxVersions() == 1) {
+              List<Cell> removedCells = removedCellsForMemStore.get(store);
+              if (removedCells == null) {
+                removedCells = new ArrayList<>();
+                removedCellsForMemStore.put(store, removedCells);
+              }
+              // upsert if VERSIONS for this CF == 1
+              // Is this right? It immediately becomes visible? St.Ack 20150907
+              size += store.upsert(entry.getValue(), getSmallestReadPoint(), removedCells);
+            } else {
+              // otherwise keep older versions around
+              for (Cell cell: entry.getValue()) {
+                // This stamping of sequenceid seems redundant; it is happening down in
+                // FSHLog when we consume edits off the ring buffer.
+                CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
+                size += store.add(cell);
               }
-              // We add to all KVs here whereas when doing increment, we do it
-              // earlier... why?
-              allKVs.addAll(entry.getValue());
             }
-
-            size = this.addAndGetGlobalMemstoreSize(size);
-            flush = isFlushSize(size);
+            // We add to all KVs here whereas when doing increment, we do it
+            // earlier... why?
+            allKVs.addAll(entry.getValue());
           }
         } finally {
           this.updatesLock.readLock().unlock();
@@ -7709,7 +7715,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // if the wal sync was unsuccessful, remove keys from memstore
       WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
       if (doRollBackMemstore) {
-        rollbackMemstore(allKVs);
+        for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
+          rollbackMemstore(entry.getKey(), entry.getValue());
+        }
+        for (Map.Entry<Store, List<Cell>> entry : removedCellsForMemStore.entrySet()) {
+          Store currStore = entry.getKey();
+          for (Cell cell: entry.getValue()) {
+            CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
+            currStore.add(cell);
+          }
+        }
         if (we != null) mvcc.complete(we);
       } else if (we != null) {
         mvcc.completeAndWait(we);
@@ -7722,11 +7737,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.metricsRegion.updateAppend();
     }
 
-    if (flush) {
       // Request a cache flush. Do it outside update lock.
+    if (isFlushSize(this.addAndGetGlobalMemstoreSize(size))) {
       requestFlush();
     }
-
     return mutate.isReturnResults() ? Result.create(allKVs) : null;
   }
 
@@ -7833,7 +7847,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     boolean doRollBackMemstore = false;
     long accumulatedResultSize = 0;
     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
-    List<Cell> memstoreCells = new ArrayList<Cell>();
+    Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
+    Map<Store, List<Cell>> forMemStore = new HashMap<>();
     Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
     try {
       rowLock = getRowLockInternal(increment.getRow());
@@ -7853,7 +7868,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALEdit walEdits = null;
           // Process increments a Store/family at a time.
           // Accumulate edits for memstore to add later after we've added to WAL.
-          Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
           for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
             byte [] columnFamilyName = entry.getKey();
             List<Cell> increments = entry.getValue();
@@ -7895,20 +7909,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           // Now write to memstore, a family at a time.
+          doRollBackMemstore = !forMemStore.isEmpty();
           for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
             Store store = entry.getKey();
             List<Cell> results = entry.getValue();
             if (store.getFamily().getMaxVersions() == 1) {
+              List<Cell> removedCells = removedCellsForMemStore.get(store);
+              if (removedCells == null) {
+                removedCells = new ArrayList<>();
+                removedCellsForMemStore.put(store, removedCells);
+              }
               // Upsert if VERSIONS for this CF == 1
-              accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
-              // TODO: St.Ack 20151222 Why no rollback in this case?
+              accumulatedResultSize += store.upsert(results, getSmallestReadPoint(), removedCells);
             } else {
               // Otherwise keep older versions around
               for (Cell cell: results) {
                 // Why we need this?
                 CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
                 accumulatedResultSize += store.add(cell);
-                doRollBackMemstore = true;
               }
             }
           }
@@ -7934,7 +7952,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
-        rollbackMemstore(memstoreCells);
+        for (Map.Entry<Store, List<Cell>> entry : forMemStore.entrySet()) {
+          rollbackMemstore(entry.getKey(), entry.getValue());
+        }
+        for (Map.Entry<Store, List<Cell>> entry : removedCellsForMemStore.entrySet()) {
+          Store currStore = entry.getKey();
+          for (Cell cell : entry.getValue()) {
+            CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
+            currStore.add(cell);
+          }
+        }
         if (walKey != null) mvcc.complete(walKey.getWriteEntry());
       } else {
         if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry());

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 03dad03..5e07b6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2406,10 +2406,11 @@ public class HStore implements Store {
   }
 
   @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
+  public long upsert(Iterable<Cell> cells, long readpoint,
+                List<Cell> removedCells) throws IOException {
     this.lock.readLock().lock();
     try {
-      return this.memstore.upsert(cells, readpoint);
+      return this.memstore.upsert(cells, readpoint, removedCells);
     } finally {
       this.lock.readLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 658ba48..9f4c3ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -127,9 +127,10 @@ public interface MemStore extends HeapSize {
    * only see each KeyValue update as atomic.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate Cells.
+   * @param removedCells collect the removed cells. It can be null.
    * @return change in memstore size
    */
-  long upsert(Iterable<Cell> cells, long readpoint);
+  long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells);
 
   /**
    * @return scanner over the memstore. This might include scanner over the snapshot when one is

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index e7a4de5..8238a97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -137,10 +137,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    * across all of them.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate KVs
+   * @param removedCells collect the removed cells. It can be null.
    * @return memstore size delta
    * @throws IOException
    */
-  long upsert(Iterable<Cell> cells, long readpoint) throws IOException;
+  long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) throws IOException;
 
   /**
    * Adds a value to the memstore

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
new file mode 100644
index 0000000..5e45eea
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
@@ -0,0 +1,357 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(SmallTests.class)
+public class TestRollbackFromClient {
+  @Rule
+  public TestName name = new TestName();
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final int SLAVES = 3;
+  private static final byte[] ROW = Bytes.toBytes("testRow");
+  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
+  private static final byte[] VALUE = Bytes.toBytes("testValue");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER,
+      FailedDefaultWALProvider.class.getName());
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testAppendRollback() throws IOException {
+    Updater updateForEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) {
+        try {
+          Append append = new Append(ROW);
+          append.add(FAMILY, QUALIFIER, VALUE);
+          append.add(FAMILY, QUALIFIER_V2, VALUE);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.append(append);
+        } catch (IOException e) {
+          // It should fail because the WAL fail also
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 0;
+      }
+    };
+    testRollback(updateForEmptyTable, 1, null);
+    testRollback(updateForEmptyTable, 2, null);
+
+    final Append preAppend = new Append(ROW);
+    preAppend.add(FAMILY, QUALIFIER, VALUE);
+    Cell initCell = preAppend.getCellList(FAMILY).get(0);
+    Updater updateForNonEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) throws IOException {
+        table.append(preAppend);
+        try {
+          Append append = new Append(ROW);
+          append.add(FAMILY, QUALIFIER, VALUE);
+          append.add(FAMILY, QUALIFIER_V2, VALUE);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.append(append);
+          Assert.fail("It should fail because the WAL sync is failed");
+        } catch (IOException e) {
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 1;
+      }
+    };
+    testRollback(updateForNonEmptyTable, 1, initCell);
+    testRollback(updateForNonEmptyTable, 2, initCell);
+  }
+
+  @Test
+  public void testIncrementRollback() throws IOException {
+    Updater updateForEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) {
+        try {
+          Increment inc = new Increment(ROW);
+          inc.addColumn(FAMILY, QUALIFIER, 1);
+          inc.addColumn(FAMILY, QUALIFIER_V2, 2);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.increment(inc);
+        } catch (IOException e) {
+          // It should fail because the WAL fail also
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 0;
+      }
+    };
+    testRollback(updateForEmptyTable, 1, null);
+    testRollback(updateForEmptyTable, 2, null);
+
+    final Increment preIncrement = new Increment(ROW);
+    preIncrement.addColumn(FAMILY, QUALIFIER, 1);
+    Cell initCell = preIncrement.getCellList(FAMILY).get(0);
+    Updater updateForNonEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) throws IOException {
+        table.increment(preIncrement);
+        try {
+          Increment inc = new Increment(ROW);
+          inc.addColumn(FAMILY, QUALIFIER, 1);
+          inc.addColumn(FAMILY, QUALIFIER_V2, 2);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.increment(inc);
+          Assert.fail("It should fail because the WAL sync is failed");
+        } catch (IOException e) {
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 1;
+      }
+    };
+    testRollback(updateForNonEmptyTable, 1, initCell);
+    testRollback(updateForNonEmptyTable, 2, initCell);
+  }
+
+  @Test
+  public void testPutRollback() throws IOException {
+    Updater updateForEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) {
+        try {
+          Put put = new Put(ROW);
+          put.addColumn(FAMILY, QUALIFIER, VALUE);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.put(put);
+          Assert.fail("It should fail because the WAL sync is failed");
+        } catch (IOException e) {
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 0;
+      }
+    };
+    testRollback(updateForEmptyTable, 1, null);
+    testRollback(updateForEmptyTable, 2, null);
+
+    final Put prePut = new Put(ROW);
+    prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
+    Cell preCell = prePut.getCellList(FAMILY).get(0);
+    Updater updateForNonEmptyTable = new Updater() {
+      @Override
+      public int updateData(Table table, byte[] family) throws IOException {
+        table.put(prePut);
+        try {
+          Put put = new Put(ROW);
+          put.addColumn(FAMILY, QUALIFIER, VALUE);
+          FailedHLog.SHOULD_FAIL.set(true);
+          table.put(put);
+          Assert.fail("It should fail because the WAL sync is failed");
+        } catch (IOException e) {
+        } finally {
+          FailedHLog.SHOULD_FAIL.set(false);
+        }
+        return 1;
+      }
+    };
+    testRollback(updateForNonEmptyTable, 1, preCell);
+    testRollback(updateForNonEmptyTable, 2, preCell);
+  }
+
+  private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
+    TableName tableName = TableName.valueOf(this.name.getMethodName());
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor col = new HColumnDescriptor(FAMILY);
+    col.setMaxVersions(versions);
+    desc.addFamily(col);
+    TEST_UTIL.getHBaseAdmin().createTable(desc);
+    int expected;
+    List<Cell> cells;
+    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+        Table table = conn.getTable(tableName)) {
+      expected = updater.updateData(table, FAMILY);
+      cells = getAllCells(table);
+    }
+    TEST_UTIL.getHBaseAdmin().disableTable(tableName);
+    TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
+    assertEquals(expected, cells.size());
+    if (initCell != null && cells.isEmpty()) {
+      Cell cell = cells.get(0);
+      assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
+      assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
+      assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
+      assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
+    }
+  }
+
+  interface Updater {
+    int updateData(Table table, byte[] family) throws IOException;
+  }
+
+  private static List<Cell> getAllCells(Table table) throws IOException {
+    List<Cell> cells = new ArrayList<>();
+    try (ResultScanner scanner = table.getScanner(new Scan())) {
+      for (Result r : scanner) {
+        cells.addAll(r.listCells());
+      }
+      return cells;
+    }
+  }
+
+  public static class FailedDefaultWALProvider extends DefaultWALProvider {
+    @Override
+    public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
+      WAL wal = super.getWAL(identifier, namespace);
+      return new FailedHLog(wal);
+    }
+  }
+
+  public static class FailedHLog implements WAL {
+    private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
+    private final WAL delegation;
+
+    FailedHLog(final WAL delegation) {
+      this.delegation = delegation;
+    }
+
+    @Override
+    public void registerWALActionsListener(WALActionsListener listener) {
+      delegation.registerWALActionsListener(listener);
+    }
+
+    @Override
+    public boolean unregisterWALActionsListener(WALActionsListener listener) {
+      return delegation.unregisterWALActionsListener(listener);
+    }
+
+    @Override
+    public byte[][] rollWriter() throws FailedLogCloseException, IOException {
+      return delegation.rollWriter();
+    }
+
+    @Override
+    public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+      return delegation.rollWriter(force);
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+      delegation.shutdown();
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegation.close();
+    }
+
+    @Override
+    public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
+        boolean inMemstore) throws IOException {
+      return delegation.append(htd, info, key, edits, inMemstore);
+    }
+
+    @Override
+    public void sync() throws IOException {
+      delegation.sync();
+    }
+
+    @Override
+    public void sync(long txid) throws IOException {
+      if (SHOULD_FAIL.get()) {
+        throw new IOException("[TESTING] we need the failure!!!");
+      }
+      delegation.sync(txid);
+    }
+
+    @Override
+    public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
+      return delegation.startCacheFlush(encodedRegionName, families);
+    }
+
+    @Override
+    public void completeCacheFlush(byte[] encodedRegionName) {
+      delegation.completeCacheFlush(encodedRegionName);
+    }
+
+    @Override
+    public void abortCacheFlush(byte[] encodedRegionName) {
+      delegation.abortCacheFlush(encodedRegionName);
+    }
+
+    @Override
+    public WALCoprocessorHost getCoprocessorHost() {
+      return delegation.getCoprocessorHost();
+    }
+
+    @Override
+    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+      return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
+    }
+
+    @Override
+    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+      return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index ba192b6..5c6f5f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -864,7 +864,7 @@ public class TestDefaultMemStore extends TestCase {
     kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
     l.add(kv1); l.add(kv2); l.add(kv3);
 
-    this.memstore.upsert(l, 2);// readpoint is 2
+    this.memstore.upsert(l, 2, null);// readpoint is 2
     long newSize = this.memstore.activeSection.getHeapSize().get();
     assert(newSize > oldSize);
     //The kv1 should be removed.
@@ -873,7 +873,7 @@ public class TestDefaultMemStore extends TestCase {
     KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
     kv4.setSequenceId(1);
     l.clear(); l.add(kv4);
-    this.memstore.upsert(l, 3);
+    this.memstore.upsert(l, 3, null);
     assertEquals(newSize, this.memstore.activeSection.getHeapSize().get());
     //The kv2 should be removed.
     assert(memstore.activeSection.getCellSkipListSet().size() == 2);
@@ -917,7 +917,7 @@ public class TestDefaultMemStore extends TestCase {
       KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
       kv1.setSequenceId(100);
       l.add(kv1);
-      memstore.upsert(l, 1000);
+      memstore.upsert(l, 1000, null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
     } finally {